* and from https://nachtimwald.com/2019/04/12/thread-pool-in-c/
*/
-/* Task to be done by each thread */
+/*
+ * Glossary:
+ *
+ * - Worker Thread: A thread in the pool.
+ * - Working Thread: A worker thread, currently doing work.
+ * - Parent Thread: The thread that owns the pool, and wants to defer work to
+ * the worker threads.
+ * - Task: Work that will be handled by a Worker Thread.
+ */
+
+/* Task to be done by each Worker Thread. */
struct task {
thread_pool_task_cb cb;
void *arg;
TAILQ_ENTRY(task) next;
};
-/* Tasks queue (utilized as FIFO) */
+/* A collection of Tasks, used as FIFO. */
TAILQ_HEAD(task_queue, task);
struct thread_pool {
pthread_mutex_t lock;
- /* Work/wait conditions, utilized accordingly to their names */
+ /*
+ * Used by the Parent Thread to wake up Worker Threads when there's
+ * work.
+ */
pthread_cond_t working_cond;
+ /*
+ * Used by Working Threads to signal that all the work is done,
+ * for the benefit of the Parent Thread.
+ */
pthread_cond_t waiting_cond;
- /* Currently working thread */
+ /* Number of Working Threads. */
unsigned int working_count;
- /* Total number of spawned threads */
+ /* Number of Worker Threads. */
unsigned int thread_count;
- /* Use to stop all the threads */
+ /*
+ * Enable to signal all threads to stop.
+ * (But all ongoing tasks will be completed first.)
+ */
bool stop;
- /* Queue of pending tasks to attend */
+ /*
+ * Tasks registered by the Parent Thread, currently waiting for a
+ * Worker Thread to claim them.
+ */
struct task_queue queue;
};
free(task);
}
-/* Get the TAIL, remove the ref from @queue, don't forget to free the task! */
+/**
+ * Pops the tail of @queue.
+ *
+ * Freeing the task is the caller's responsibility.
+ */
static struct task *
task_queue_pull(struct task_queue *queue)
{
}
/*
- * Poll for pending tasks at the pool queue. Called by each spawned thread.
+ * This is the core Working Thread function.
*
- * Once a task is available, at least one thread of the pool will process it.
+ * In my opinion, "poll" is a bit of a misnomer. In this context, "poll"
+ * appears to mean four things:
*
- * The call ends only if the pool wishes to be stopped.
+ * 1. Wait for work.
+ * 2. Claim the work.
+ * 3. Do the work.
+ * 4. Repeat until someone asks us to stop.
*/
static void *
tasks_poll(void *arg)
struct thread_pool *pool = arg;
struct task *task;
- /* The thread has started, send the signal */
- thread_pool_lock(pool);
- pthread_cond_signal(&(pool->waiting_cond));
-
while (true) {
while (TAILQ_EMPTY(&(pool->queue)) && !pool->stop) {
+ /* Wait until the parent sends us work. */
pr_op_debug("Thread waiting for work...");
pthread_cond_wait(&(pool->working_cond), &(pool->lock));
}
if (pool->stop)
break;
- /* Pull the tail */
+ /* Claim the work. */
task = task_queue_pull(&(pool->queue));
pool->working_count++;
pr_op_debug("Working on task #%u", pool->working_count);
thread_pool_unlock(pool);
+ /*
+ * The null check exists because pthread_cond_signal() is
+ * technically allowed to wake up more than one thread.
+ */
if (task != NULL) {
task->cb(task->arg);
/* Now releasing the task */
thread_pool_lock(pool);
pool->working_count--;
+
+ /* If there's no more work left, signal the parent. */
if (!pool->stop && pool->working_count == 0 &&
TAILQ_EMPTY(&(pool->queue)))
pthread_cond_signal(&(pool->waiting_cond));
return NULL;
}
-/*
- * Wait a couple of seconds to be sure the thread has started and is ready to
- * work
- */
-static int
-thread_pool_thread_wait_start(struct thread_pool *pool)
-{
- struct timespec tmout = {
- .tv_sec = 0 ,
- .tv_nsec = 0
- };
- int error;
-
- /* 2 seconds to start a thread */
- clock_gettime(CLOCK_REALTIME, &tmout);
- tmout.tv_sec += 2;
-
- thread_pool_lock(pool);
- error = pthread_cond_timedwait(&(pool->waiting_cond), &(pool->lock),
- &tmout);
- if (error) {
- thread_pool_unlock(pool);
- return pr_op_errno(error, "Waiting thread to start");
- }
- thread_pool_unlock(pool);
-
- return 0;
-}
-
static int
thread_pool_attr_create(pthread_attr_t *attr)
{
}
static int
-tpool_thread_spawn(struct thread_pool *pool, pthread_attr_t *attr,
- thread_pool_task_cb entry_point)
+spawn_threads(struct thread_pool *pool, unsigned int threads)
{
+ pthread_attr_t attr;
pthread_t thread_id;
+ unsigned int i;
int error;
- memset(&thread_id, 0, sizeof(pthread_t));
-
- error = pthread_create(&thread_id, attr, entry_point, pool);
- if (error)
- return pr_op_errno(error, "Spawning pool thread");
-
- error = thread_pool_thread_wait_start(pool);
+ error = thread_pool_attr_create(&attr);
if (error)
return error;
- return 0;
+ for (i = 0; i < threads; i++) {
+ memset(&thread_id, 0, sizeof(pthread_t));
+ error = pthread_create(&thread_id, &attr, tasks_poll, pool);
+ if (error) {
+ error = pr_op_errno(error, "Spawning pool thread");
+ goto end;
+ }
+
+ pool->thread_count++;
+ pr_op_debug("Pool thread #%u spawned", i);
+ }
+
+end:
+ pthread_attr_destroy(&attr);
+ return error;
}
int
thread_pool_create(unsigned int threads, struct thread_pool **pool)
{
struct thread_pool *tmp;
- pthread_attr_t attr;
- unsigned int i;
int error;
tmp = malloc(sizeof(struct thread_pool));
tmp->working_count = 0;
tmp->thread_count = 0;
- error = thread_pool_attr_create(&attr);
+ error = spawn_threads(tmp, threads);
if (error)
goto free_waiting_cond;
- for (i = 0; i < threads; i++) {
- error = tpool_thread_spawn(tmp, &attr, tasks_poll);
- if (error) {
- pthread_attr_destroy(&attr);
- thread_pool_destroy(tmp);
- return error;
- }
- tmp->thread_count++;
- pr_op_debug("Pool thread #%u spawned", i);
- }
- pthread_attr_destroy(&attr);
-
*pool = tmp;
return 0;
free_waiting_cond:
pr_op_debug("- Thread count: %u", pool->thread_count);
pr_op_debug("- Empty queue: %s",
TAILQ_EMPTY(&(pool->queue)) ? "true" : "false");
- if ((!pool->stop &&
- (pool->working_count != 0 || !TAILQ_EMPTY(&(pool->queue)))) ||
- (pool->stop && pool->thread_count != 0))
- pthread_cond_wait(&(pool->waiting_cond), &(pool->lock));
- else
- break;
+
+ if (pool->stop) {
+ /* Wait until all Working Threads are dead. */
+ if (pool->thread_count == 0)
+ break;
+ } else {
+ /* Wait until all Working Threads finish. */
+ if (pool->working_count == 0 &&
+ TAILQ_EMPTY(&(pool->queue)))
+ break;
+ }
+
+ pthread_cond_wait(&(pool->waiting_cond), &(pool->lock));
}
thread_pool_unlock(pool);
pr_op_debug("Waiting has ended, all tasks have finished");