]> git.ipfire.org Git - thirdparty/FORT-validator.git/commitdiff
Thread pool: Convert threads to joinable
authorAlberto Leiva Popper <ydahhrk@gmail.com>
Mon, 7 Jun 2021 19:22:17 +0000 (14:22 -0500)
committerAlberto Leiva Popper <ydahhrk@gmail.com>
Fri, 11 Jun 2021 01:09:01 +0000 (20:09 -0500)
Hypothesis: Something (which I haven't spotted yet) was causing the
main thread to skip its wait before the pool threads finished their
tasks. Maybe something to do with the ready signal again?

So the main thread returned early, which means pool threads were
silently suppressed by the OS. That explains the early terminations and
nonexistent stack traces.

If I keep finding crippling errors like this, I will definitely have to
purge the thread pool. It's turned out to be a fucking bug colony at
this point. I'm sick of it.

Way I see it, the root of the problem was the thread pool's control
code, which was too complicated for its own good. A surprisingly large
part of why it was overcomplicated was because it reinvented thread
joining.

So I simplified the control code by removing the detach property. Now
that the main thread joins the proper way, the validation code will not
be interrupted anymore.

This might well be the solution for #49. However, it bothers me that I
still don't have a reasonable explanation as to why the main thread
seemed to be skipping wait.

Argh!

src/http/http.c
src/main.c
src/thread/thread_pool.c

index 45f293f6752d87c9b07609c05547fa789e878985..01cdc8bd5441c870d6d63b6e68c9b23b851aaaa8 100644 (file)
@@ -346,7 +346,7 @@ __http_download_file(struct rpki_uri *uri, long *response_code, long ims_value,
                    cond_met, log_operation, write_cb, out, is_ta);
                if (error != EREQFAILED) {
                        TA_DEBUG_MSG("%d", error);
-                       break;
+                       break; /* Note: Usually happy path */
                }
 
                TA_DEBUG_MSG("%d", error);
index cefd4c1df1094bd4b5569a7bd3a7426d36361091..c85c7bb6681a6d42a0a1aa26b123f62f1be924b4 100644 (file)
@@ -34,6 +34,7 @@ main(int argc, char **argv)
 {
        int error;
 
+       printf("Fort 1.5.0.1\n");
        /* Initializations */
 
        error = log_setup();
@@ -113,5 +114,6 @@ revert_config:
        free_rpki_config();
 revert_log:
        log_teardown();
+       PR_DEBUG_MSG("Main thread returning.");
        return error;
 }
index e3c04380f025b04a337171a2dd48ec1328a873aa..9ee8eb8af5b5f0ed598bf94b39c3c1716d5518b7 100644 (file)
@@ -56,8 +56,14 @@ struct thread_pool {
        pthread_cond_t worker2parent;
        /* Number of Working Threads. */
        unsigned int working_count;
-       /* Number of Worker Threads. */
+
+       /*
+        * Just a counter. Its use is very specific; you probably don't want
+        * to rely on this.
+        * See @thread_ids_len for what you probably actually want.
+        */
        unsigned int thread_count;
+
        /*
         * Enable to signal all threads to stop.
         * (But all ongoing tasks will be completed first.)
@@ -68,6 +74,9 @@ struct thread_pool {
         * Worker Thread to claim them.
         */
        struct task_queue queue;
+
+       pthread_t *thread_ids; /* Array. */
+       unsigned int thread_ids_len;
 };
 
 static void
@@ -195,12 +204,9 @@ tasks_poll(void *arg)
        unsigned int thread_id;
 
        mutex_lock(pool);
-       thread_id = pool->thread_count;
 
-       /* We're running; signal the Parent Thread. */
        pool->thread_count++;
-       if (pool->thread_count == 1)
-               signal_to_parent(pool);
+       thread_id = pool->thread_count;
 
        while (true) {
                while (TAILQ_EMPTY(&pool->queue) && !pool->stop)
@@ -231,13 +237,8 @@ tasks_poll(void *arg)
                        signal_to_parent(pool);
        }
 
-       /* The thread will cease to exist */
-       pool->thread_count--;
-       if (pool->thread_count == 0)
-               signal_to_parent(pool);
        mutex_unlock(pool);
-
-       pr_op_debug("Thread %s.%u: Terminating.", pool->name, thread_id);
+       pr_op_debug("Thread %s.%u: Returning.", pool->name, thread_id);
        return NULL;
 }
 
@@ -258,21 +259,29 @@ thread_pool_attr_create(pthread_attr_t *attr)
                    "Calling pthread_attr_setstacksize()");
        }
 
-       error = pthread_attr_setdetachstate(attr, PTHREAD_CREATE_DETACHED);
-       if (error) {
-               pthread_attr_destroy(attr);
-               return pr_op_errno(error,
-                   "Calling pthread_attr_setdetachstate()");
-       }
+       /*
+        * In the original implementation, they set the threads as detached
+        * here.
+        *
+        * This lead to giant load of miserable trouble, because unless I'm
+        * missing something, they seemed to assume there was something ensuring
+        * the threads had spawned by the time the parent posted work, and also,
+        * something ensuring that the threads had died by the time the main
+        * thread started cleaning up modules. Neither of these were true at
+        * all.
+        *
+        * Between complicating the code with even more control logic, and
+        * employing joinable threads, I chose the latter. I don't think I will
+        * ever use detached threads ever again.
+        */
 
        return 0;
 }
 
 static int
-spawn_threads(struct thread_pool *pool, unsigned int threads)
+spawn_threads(struct thread_pool *pool)
 {
        pthread_attr_t attr;
-       pthread_t thread_id;
        unsigned int i;
        int error;
 
@@ -280,15 +289,15 @@ spawn_threads(struct thread_pool *pool, unsigned int threads)
        if (error)
                return error;
 
-       for (i = 0; i < threads; i++) {
-               memset(&thread_id, 0, sizeof(pthread_t));
-               error = pthread_create(&thread_id, &attr, tasks_poll, pool);
+       for (i = 0; i < pool->thread_ids_len; i++) {
+               error = pthread_create(&pool->thread_ids[i], &attr, tasks_poll,
+                   pool);
                if (error) {
                        error = pr_op_errno(error, "Spawning pool thread");
                        goto end;
                }
 
-               pr_op_debug("Pool '%s': Thread #%u spawned", pool->name, i);
+               pr_op_debug("Pool '%s': Thread #%u spawned", pool->name, i + 1);
        }
 
 end:
@@ -335,13 +344,22 @@ thread_pool_create(char const *name, unsigned int threads,
        result->stop = false;
        result->working_count = 0;
        result->thread_count = 0;
+       result->thread_ids = calloc(threads, sizeof(pthread_t));
+       if (result->thread_ids == NULL) {
+               error = pr_enomem();
+               goto free_waiting_cond;
+       }
+       result->thread_ids_len = threads;
 
-       error = spawn_threads(result, threads);
+       error = spawn_threads(result);
        if (error)
-               goto free_waiting_cond;
+               goto free_thread_ids;
 
        *pool = result;
        return 0;
+
+free_thread_ids:
+       free(result->thread_ids);
 free_waiting_cond:
        pthread_cond_destroy(&result->worker2parent);
 free_working_cond:
@@ -358,6 +376,9 @@ thread_pool_destroy(struct thread_pool *pool)
 {
        struct task_queue *queue;
        struct thread_pool_task *tmp;
+       unsigned int t;
+
+       pr_op_debug("Destroying thread pool '%s'.", pool->name);
 
        /* Remove all pending work and send the signal to stop it */
        mutex_lock(pool);
@@ -371,41 +392,16 @@ thread_pool_destroy(struct thread_pool *pool)
        pthread_cond_broadcast(&pool->parent2worker);
        mutex_unlock(pool);
 
-       /* Wait for all to end */
-       thread_pool_wait(pool);
+       for (t = 0; t < pool->thread_ids_len; t++)
+               pthread_join(pool->thread_ids[t], NULL);
+       free(pool->thread_ids);
 
        pthread_cond_destroy(&pool->worker2parent);
        pthread_cond_destroy(&pool->parent2worker);
        pthread_mutex_destroy(&pool->lock);
        free(pool);
-}
-
-/*
- * pthread_create() does not guarantee that the threads will have started by the
- * time work needs to be done.
- *
- * If they haven't started, this function will add a little timeout.
- *
- * Lock must be held.
- */
-static int validate_thread_count(struct thread_pool *pool)
-{
-       struct timespec tmout;
-       int error;
 
-       if (pool->thread_count != 0)
-               return 0;
-
-       /* Give the threads 2 more seconds */
-       clock_gettime(CLOCK_REALTIME, &tmout);
-       tmout.tv_sec += 2;
-       error = pthread_cond_timedwait(&pool->worker2parent, &pool->lock,
-           &tmout);
-
-       if (pool->thread_count != 0)
-               return 0;
-
-       return pr_op_errno(error, "Waiting thread to start");
+       pr_op_debug("Destroyed.");
 }
 
 /*
@@ -424,16 +420,13 @@ thread_pool_push(struct thread_pool *pool, char const *task_name,
                return error;
 
        mutex_lock(pool);
-
-       error = validate_thread_count(pool);
-       if (error) {
-               mutex_unlock(pool);
-               task_destroy(task);
-               return error;
-       }
-
        task_queue_push(pool, task);
        mutex_unlock(pool);
+
+       /*
+        * Note: This assumes the threads have already spawned.
+        * If not, they will claim work once they spawn anyway.
+        */
        signal_to_worker(pool);
        return 0;
 }
@@ -445,7 +438,7 @@ thread_pool_avail_threads(struct thread_pool *pool)
        bool result;
 
        mutex_lock(pool);
-       result = (pool->working_count < pool->thread_count);
+       result = (pool->working_count < pool->thread_ids_len);
        mutex_unlock(pool);
 
        return result;
@@ -457,22 +450,16 @@ thread_pool_wait(struct thread_pool *pool)
 {
        mutex_lock(pool);
 
-       while (true) {
-               pr_op_debug("- Stop: %s", pool->stop ? "true" : "false");
-               pr_op_debug("- Working count: %u", pool->working_count);
-               pr_op_debug("- Thread count: %u", pool->thread_count);
-               pr_op_debug("- Empty task queue: %s",
-                   TAILQ_EMPTY(&pool->queue) ? "true" : "false");
-
-               if (pool->stop) {
-                       /* Wait until all Working Threads are dead. */
-                       if (pool->thread_count == 0)
-                               break;
-               } else {
-                       /* Wait until all Working Threads finish. */
-                       if (pool->working_count == 0 &&
-                           TAILQ_EMPTY(&pool->queue))
-                               break;
+       /* If the pool has to stop, the wait will happen during the joins. */
+       while (!pool->stop) {
+               pr_op_debug("- Active workers: %u", pool->working_count);
+               pr_op_debug("- Task queue: %s",
+                   TAILQ_EMPTY(&pool->queue) ? "Empty" : "Not Empty");
+
+               if (pool->working_count == 0 && TAILQ_EMPTY(&pool->queue)) {
+                       pr_op_debug("Pool '%s': All work has been completed.",
+                           pool->name);
+                       break;
                }
 
                pr_op_debug("Pool '%s': Waiting for tasks to be completed",
@@ -481,5 +468,4 @@ thread_pool_wait(struct thread_pool *pool)
        }
 
        mutex_unlock(pool);
-       pr_op_debug("Pool '%s': Waiting has ended, all tasks done", pool->name);
 }