*/
/* Task to be done by each Worker Thread. */
-struct task {
+struct thread_pool_task {
+ /*
+ * Debugging purposes only. Uniqueness is not a requirement.
+ * Will not be released by task_destroy().
+ */
+ char const *name;
thread_pool_task_cb cb;
void *arg;
- TAILQ_ENTRY(task) next;
+ TAILQ_ENTRY(thread_pool_task) next;
};
/* A collection of Tasks, used as FIFO. */
-TAILQ_HEAD(task_queue, task);
+TAILQ_HEAD(task_queue, thread_pool_task);
struct thread_pool {
+ /*
+ * Debugging purposes only. Uniqueness is not a requirement.
+ * Will not be released by thread_pool_destroy().
+ */
+ char const *name;
pthread_mutex_t lock;
/*
* Used by the Parent Thread to wake up Worker Threads when there's
}
static int
-task_create(thread_pool_task_cb cb, void *arg, struct task **result)
+task_create(char const *name, thread_pool_task_cb cb, void *arg,
+ struct thread_pool_task **out)
{
- struct task *tmp;
+ struct thread_pool_task *task;
- tmp = malloc(sizeof(struct task));
- if (tmp == NULL)
+ task = malloc(sizeof(struct thread_pool_task));
+ if (task == NULL)
return pr_enomem();
- tmp->cb = cb;
- tmp->arg = arg;
+ task->name = name;
+ task->cb = cb;
+ task->arg = arg;
- *result = tmp;
+ *out = task;
return 0;
}
static void
-task_destroy(struct task *task)
+task_destroy(struct thread_pool_task *task)
{
free(task);
}
/**
- * Pops the tail of @queue.
+ * Pops the tail of @queue. pthread_cond_signal() is technically allowed to wake
+ * more than one thread, so please keep in mind that the result might be NULL.
*
* Freeing the task is the caller's responsibility.
*/
-static struct task *
-task_queue_pull(struct task_queue *queue)
+static struct thread_pool_task *
+task_queue_pull(struct thread_pool *pool, unsigned int thread_id)
{
- struct task *tmp;
-
- tmp = TAILQ_LAST(queue, task_queue);
- TAILQ_REMOVE(queue, tmp, next);
- pr_op_debug("Pulling a task from the pool");
+ struct thread_pool_task *task;
+
+ task = TAILQ_LAST(&pool->queue, task_queue);
+ if (task != NULL) {
+ TAILQ_REMOVE(&pool->queue, task, next);
+ pr_op_debug("Thread %s.%u: Claimed task '%s'", pool->name,
+ thread_id, task->name);
+ } else {
+ pr_op_debug("Thread %s.%u: Claimed nothing", pool->name,
+ thread_id);
+ }
- return tmp;
+ return task;
}
/* Insert the task at the HEAD */
static void
-task_queue_push(struct task_queue *queue, struct task *task)
+task_queue_push(struct thread_pool *pool, struct thread_pool_task *task)
{
- TAILQ_INSERT_HEAD(queue, task, next);
- pr_op_debug("Pushing a task to the pool");
+ TAILQ_INSERT_HEAD(&pool->queue, task, next);
+ pr_op_debug("Pool '%s': Pushed task '%s'", pool->name, task->name);
}
/*
tasks_poll(void *arg)
{
struct thread_pool *pool = arg;
- struct task *task;
+ struct thread_pool_task *task;
+ unsigned int thread_id;
thread_pool_lock(pool);
+ thread_id = pool->thread_count;
+
/* We're running; signal the Parent Thread. */
pool->thread_count++;
pthread_cond_signal(&(pool->waiting_cond));
while (true) {
while (TAILQ_EMPTY(&(pool->queue)) && !pool->stop) {
/* Wait until the parent sends us work. */
- pr_op_debug("Thread waiting for work...");
+ pr_op_debug("Thread %s.%u: Waiting for work...",
+ pool->name, thread_id);
pthread_cond_wait(&(pool->working_cond), &(pool->lock));
}
break;
/* Claim the work. */
- task = task_queue_pull(&(pool->queue));
+ task = task_queue_pull(pool, thread_id);
pool->working_count++;
- pr_op_debug("Working on task #%u", pool->working_count);
thread_pool_unlock(pool);
- /*
- * The null check exists because pthread_cond_signal() is
- * technically allowed to wake up more than one thread.
- */
if (task != NULL) {
task->cb(task->arg);
+ pr_op_debug("Thread %s.%u: Task '%s' ended", pool->name,
+ thread_id, task->name);
/* Now releasing the task */
task_destroy(task);
- pr_op_debug("Task ended");
}
thread_pool_lock(pool);
pthread_cond_signal(&(pool->waiting_cond));
thread_pool_unlock(pool);
+ pr_op_debug("Thread %s.%u: Terminating.", pool->name, thread_id);
return NULL;
}
goto end;
}
- pr_op_debug("Pool thread #%u spawned", i);
+ pr_op_debug("Pool '%s': Thread #%u spawned", pool->name, i);
}
end:
}
int
-thread_pool_create(unsigned int threads, struct thread_pool **pool)
+thread_pool_create(char const *name, unsigned int threads,
+ struct thread_pool **pool)
{
- struct thread_pool *tmp;
+ struct thread_pool *result;
int error;
- tmp = malloc(sizeof(struct thread_pool));
- if (tmp == NULL)
+ result = malloc(sizeof(struct thread_pool));
+ if (result == NULL)
return pr_enomem();
/* Init locking */
- error = pthread_mutex_init(&(tmp->lock), NULL);
+ error = pthread_mutex_init(&(result->lock), NULL);
if (error) {
error = pr_op_errno(error, "Calling pthread_mutex_init()");
goto free_tmp;
}
/* Init conditional to signal pending work */
- error = pthread_cond_init(&(tmp->working_cond), NULL);
+ error = pthread_cond_init(&(result->working_cond), NULL);
if (error) {
error = pr_op_errno(error,
"Calling pthread_cond_init() at working condition");
}
/* Init conditional to signal no pending work */
- error = pthread_cond_init(&(tmp->waiting_cond), NULL);
+ error = pthread_cond_init(&(result->waiting_cond), NULL);
if (error) {
error = pr_op_errno(error,
"Calling pthread_cond_init() at waiting condition");
goto free_working_cond;
}
- TAILQ_INIT(&(tmp->queue));
- tmp->stop = false;
- tmp->working_count = 0;
- tmp->thread_count = 0;
+ TAILQ_INIT(&(result->queue));
+ result->name = name;
+ result->stop = false;
+ result->working_count = 0;
+ result->thread_count = 0;
- error = spawn_threads(tmp, threads);
+ error = spawn_threads(result, threads);
if (error)
goto free_waiting_cond;
- *pool = tmp;
+ *pool = result;
return 0;
free_waiting_cond:
- pthread_cond_destroy(&(tmp->waiting_cond));
+ pthread_cond_destroy(&(result->waiting_cond));
free_working_cond:
- pthread_cond_destroy(&(tmp->working_cond));
+ pthread_cond_destroy(&(result->working_cond));
free_mutex:
- pthread_mutex_destroy(&(tmp->lock));
+ pthread_mutex_destroy(&(result->lock));
free_tmp:
- free(tmp);
+ free(result);
return error;
}
thread_pool_destroy(struct thread_pool *pool)
{
struct task_queue *queue;
- struct task *tmp;
+ struct thread_pool_task *tmp;
/* Remove all pending work and send the signal to stop it */
thread_pool_lock(pool);
* @arg.
*/
int
-thread_pool_push(struct thread_pool *pool, thread_pool_task_cb cb, void *arg)
+thread_pool_push(struct thread_pool *pool, char const *task_name,
+ thread_pool_task_cb cb, void *arg)
{
- struct task *task;
+ struct thread_pool_task *task;
int error;
task = NULL;
- error = task_create(cb, arg, &task);
+ error = task_create(task_name, cb, arg, &task);
if (error)
return error;
goto end;
}
- task_queue_push(&(pool->queue), task);
+ task_queue_push(pool, task);
/* There's work to do! */
pthread_cond_signal(&(pool->working_cond));
{
thread_pool_lock(pool);
while (true) {
- pr_op_debug("Waiting all tasks from the pool to end");
pr_op_debug("- Stop: %s", pool->stop ? "true" : "false");
pr_op_debug("- Working count: %u", pool->working_count);
pr_op_debug("- Thread count: %u", pool->thread_count);
- pr_op_debug("- Empty queue: %s",
+ pr_op_debug("- Empty task queue: %s",
TAILQ_EMPTY(&(pool->queue)) ? "true" : "false");
if (pool->stop) {
break;
}
+ pr_op_debug("Pool '%s': Waiting for tasks to be completed",
+ pool->name);
pthread_cond_wait(&(pool->waiting_cond), &(pool->lock));
}
thread_pool_unlock(pool);
- pr_op_debug("Waiting has ended, all tasks have finished");
+ pr_op_debug("Pool '%s': Waiting has ended, all tasks done",
+ pool->name);
}