]> git.ipfire.org Git - thirdparty/FORT-validator.git/commitdiff
Thread pool: remove thread ready signal
authorAlberto Leiva Popper <ydahhrk@gmail.com>
Wed, 7 Apr 2021 20:11:44 +0000 (15:11 -0500)
committerAlberto Leiva Popper <ydahhrk@gmail.com>
Wed, 7 Apr 2021 20:16:02 +0000 (15:16 -0500)
It seems the workers were sending the "ready" signal before the
parent thread started to wait for it. This lead to timeouts.

The parent thread doesn't really need to wait for the worker
threads to carry on with its own work, so I've decided to remove
the check.

Progress on #49.

src/thread/thread_pool.c

index 7a5e19255bc9d814283dec7272ab58b1148ae57d..bc706c712d067772e039b9eec168cb31c5fc2c4c 100644 (file)
  * and from https://nachtimwald.com/2019/04/12/thread-pool-in-c/
  */
 
-/* Task to be done by each thread */
+/*
+ * Glossary:
+ *
+ * - Worker Thread: A thread in the pool.
+ * - Working Thread: A worker thread, currently doing work.
+ * - Parent Thread: The thread that owns the pool, and wants to defer work to
+ *   the worker threads.
+ * - Task: Work that will be handled by a Worker Thread.
+ */
+
+/* Task to be done by each Worker Thread. */
 struct task {
        thread_pool_task_cb cb;
        void *arg;
        TAILQ_ENTRY(task) next;
 };
 
-/* Tasks queue (utilized as FIFO) */
+/* A collection of Tasks, used as FIFO. */
 TAILQ_HEAD(task_queue, task);
 
 struct thread_pool {
        pthread_mutex_t lock;
-       /* Work/wait conditions, utilized accordingly to their names */
+       /*
+        * Used by the Parent Thread to wake up Worker Threads when there's
+        * work.
+        */
        pthread_cond_t working_cond;
+       /*
+        * Used by Working Threads to signal that all the work is done,
+        * for the benefit of the Parent Thread.
+        */
        pthread_cond_t waiting_cond;
-       /* Currently working thread */
+       /* Number of Working Threads. */
        unsigned int working_count;
-       /* Total number of spawned threads */
+       /* Number of Worker Threads. */
        unsigned int thread_count;
-       /* Use to stop all the threads */
+       /*
+        * Enable to signal all threads to stop.
+        * (But all ongoing tasks will be completed first.)
+        */
        bool stop;
-       /* Queue of pending tasks to attend */
+       /*
+        * Tasks registered by the Parent Thread, currently waiting for a
+        * Worker Thread to claim them.
+        */
        struct task_queue queue;
 };
 
@@ -80,7 +103,11 @@ task_destroy(struct task *task)
        free(task);
 }
 
-/* Get the TAIL, remove the ref from @queue, don't forget to free the task! */
+/**
+ * Pops the tail of @queue.
+ *
+ * Freeing the task is the caller's responsibility.
+ */
 static struct task *
 task_queue_pull(struct task_queue *queue)
 {
@@ -102,11 +129,15 @@ task_queue_push(struct task_queue *queue, struct task *task)
 }
 
 /*
- * Poll for pending tasks at the pool queue. Called by each spawned thread.
+ * This is the core Working Thread function.
  *
- * Once a task is available, at least one thread of the pool will process it.
+ * In my opinion, "poll" is a bit of a misnomer. In this context, "poll"
+ * appears to mean four things:
  *
- * The call ends only if the pool wishes to be stopped.
+ * 1. Wait for work.
+ * 2. Claim the work.
+ * 3. Do the work.
+ * 4. Repeat until someone asks us to stop.
  */
 static void *
 tasks_poll(void *arg)
@@ -114,12 +145,9 @@ tasks_poll(void *arg)
        struct thread_pool *pool = arg;
        struct task *task;
 
-       /* The thread has started, send the signal */
-       thread_pool_lock(pool);
-       pthread_cond_signal(&(pool->waiting_cond));
-
        while (true) {
                while (TAILQ_EMPTY(&(pool->queue)) && !pool->stop) {
+                       /* Wait until the parent sends us work. */
                        pr_op_debug("Thread waiting for work...");
                        pthread_cond_wait(&(pool->working_cond), &(pool->lock));
                }
@@ -127,12 +155,16 @@ tasks_poll(void *arg)
                if (pool->stop)
                        break;
 
-               /* Pull the tail */
+               /* Claim the work. */
                task = task_queue_pull(&(pool->queue));
                pool->working_count++;
                pr_op_debug("Working on task #%u", pool->working_count);
                thread_pool_unlock(pool);
 
+               /*
+                * The null check exists because pthread_cond_signal() is
+                * technically allowed to wake up more than one thread.
+                */
                if (task != NULL) {
                        task->cb(task->arg);
                        /* Now releasing the task */
@@ -142,6 +174,8 @@ tasks_poll(void *arg)
 
                thread_pool_lock(pool);
                pool->working_count--;
+
+               /* If there's no more work left, signal the parent. */
                if (!pool->stop && pool->working_count == 0 &&
                    TAILQ_EMPTY(&(pool->queue)))
                        pthread_cond_signal(&(pool->waiting_cond));
@@ -155,35 +189,6 @@ tasks_poll(void *arg)
        return NULL;
 }
 
-/*
- * Wait a couple of seconds to be sure the thread has started and is ready to
- * work
- */
-static int
-thread_pool_thread_wait_start(struct thread_pool *pool)
-{
-       struct timespec tmout = {
-           .tv_sec = 0 ,
-           .tv_nsec = 0
-       };
-       int error;
-
-       /* 2 seconds to start a thread */
-       clock_gettime(CLOCK_REALTIME, &tmout);
-       tmout.tv_sec += 2;
-
-       thread_pool_lock(pool);
-       error = pthread_cond_timedwait(&(pool->waiting_cond), &(pool->lock),
-           &tmout);
-       if (error) {
-               thread_pool_unlock(pool);
-               return pr_op_errno(error, "Waiting thread to start");
-       }
-       thread_pool_unlock(pool);
-
-       return 0;
-}
-
 static int
 thread_pool_attr_create(pthread_attr_t *attr)
 {
@@ -212,31 +217,38 @@ thread_pool_attr_create(pthread_attr_t *attr)
 }
 
 static int
-tpool_thread_spawn(struct thread_pool *pool, pthread_attr_t *attr,
-    thread_pool_task_cb entry_point)
+spawn_threads(struct thread_pool *pool, unsigned int threads)
 {
+       pthread_attr_t attr;
        pthread_t thread_id;
+       unsigned int i;
        int error;
 
-       memset(&thread_id, 0, sizeof(pthread_t));
-
-       error = pthread_create(&thread_id, attr, entry_point, pool);
-       if (error)
-               return pr_op_errno(error, "Spawning pool thread");
-
-       error = thread_pool_thread_wait_start(pool);
+       error = thread_pool_attr_create(&attr);
        if (error)
                return error;
 
-       return 0;
+       for (i = 0; i < threads; i++) {
+               memset(&thread_id, 0, sizeof(pthread_t));
+               error = pthread_create(&thread_id, &attr, tasks_poll, pool);
+               if (error) {
+                       error = pr_op_errno(error, "Spawning pool thread");
+                       goto end;
+               }
+
+               pool->thread_count++;
+               pr_op_debug("Pool thread #%u spawned", i);
+       }
+
+end:
+       pthread_attr_destroy(&attr);
+       return error;
 }
 
 int
 thread_pool_create(unsigned int threads, struct thread_pool **pool)
 {
        struct thread_pool *tmp;
-       pthread_attr_t attr;
-       unsigned int i;
        int error;
 
        tmp = malloc(sizeof(struct thread_pool));
@@ -271,22 +283,10 @@ thread_pool_create(unsigned int threads, struct thread_pool **pool)
        tmp->working_count = 0;
        tmp->thread_count = 0;
 
-       error = thread_pool_attr_create(&attr);
+       error = spawn_threads(tmp, threads);
        if (error)
                goto free_waiting_cond;
 
-       for (i = 0; i < threads; i++) {
-               error = tpool_thread_spawn(tmp, &attr, tasks_poll);
-               if (error) {
-                       pthread_attr_destroy(&attr);
-                       thread_pool_destroy(tmp);
-                       return error;
-               }
-               tmp->thread_count++;
-               pr_op_debug("Pool thread #%u spawned", i);
-       }
-       pthread_attr_destroy(&attr);
-
        *pool = tmp;
        return 0;
 free_waiting_cond:
@@ -376,12 +376,19 @@ thread_pool_wait(struct thread_pool *pool)
                pr_op_debug("- Thread count: %u", pool->thread_count);
                pr_op_debug("- Empty queue: %s",
                    TAILQ_EMPTY(&(pool->queue)) ? "true" : "false");
-               if ((!pool->stop &&
-                   (pool->working_count != 0 || !TAILQ_EMPTY(&(pool->queue)))) ||
-                   (pool->stop && pool->thread_count != 0))
-                       pthread_cond_wait(&(pool->waiting_cond), &(pool->lock));
-               else
-                       break;
+
+               if (pool->stop) {
+                       /* Wait until all Working Threads are dead. */
+                       if (pool->thread_count == 0)
+                               break;
+               } else {
+                       /* Wait until all Working Threads finish. */
+                       if (pool->working_count == 0 &&
+                           TAILQ_EMPTY(&(pool->queue)))
+                               break;
+               }
+
+               pthread_cond_wait(&(pool->waiting_cond), &(pool->lock));
        }
        thread_pool_unlock(pool);
        pr_op_debug("Waiting has ended, all tasks have finished");