* Used by the Parent Thread to wake up Worker Threads when there's
* work.
*/
- pthread_cond_t working_cond;
+ pthread_cond_t parent2worker;
/*
* Used by Working Threads to signal the Parent Thread that workers are
* ready (during initialization) or that all the work is done (after
* initialization).
*/
- pthread_cond_t waiting_cond;
+ pthread_cond_t worker2parent;
/* Number of Working Threads. */
unsigned int working_count;
/* Number of Worker Threads. */
};
static void
-thread_pool_lock(struct thread_pool *pool)
+panic_on_fail(int error, char const *function_name)
{
- int error;
-
- error = pthread_mutex_lock(&(pool->lock));
if (error)
- pr_crit("pthread_mutex_lock() returned error code %d. This is too critical for a graceful recovery; I must die now.",
- error);
+ pr_crit("%s() returned error code %d. This is too critical for a graceful recovery; I must die now.",
+ function_name, error);
}
static void
-thread_pool_unlock(struct thread_pool *pool)
+mutex_lock(struct thread_pool *pool)
{
- int error;
+ panic_on_fail(pthread_mutex_lock(&pool->lock), "pthread_mutex_lock");
+}
- error = pthread_mutex_unlock(&(pool->lock));
- if (error)
- pr_crit("pthread_mutex_unlock() returned error code %d. This is too critical for a graceful recovery; I must die now.",
- error);
+static void
+mutex_unlock(struct thread_pool *pool)
+{
+ panic_on_fail(pthread_mutex_unlock(&pool->lock), "pthread_mutex_unlock");
+}
+
+/* Wait until the parent sends us work. */
+static void
+wait_for_parent_signal(struct thread_pool *pool, unsigned int thread_id)
+{
+ pr_op_debug("Thread %s.%u: Waiting for work...", pool->name, thread_id);
+ panic_on_fail(pthread_cond_wait(&pool->parent2worker, &pool->lock),
+ "pthread_cond_wait");
+}
+
+static void
+signal_to_parent(struct thread_pool *pool)
+{
+ panic_on_fail(pthread_cond_signal(&pool->worker2parent));
+}
+
+static void
+wait_for_worker_signal(struct thread_pool *pool)
+{
+ panic_on_fail(pthread_cond_wait(&pool->worker2parent, &pool->lock),
+ "pthread_cond_wait");
+}
+
+static void
+signal_to_worker(struct thread_pool *pool)
+{
+ panic_on_fail(pthread_cond_signal(&pool->parent2worker));
}
static int
struct thread_pool_task *task;
unsigned int thread_id;
- thread_pool_lock(pool);
+ mutex_lock(pool);
thread_id = pool->thread_count;
/* We're running; signal the Parent Thread. */
pool->thread_count++;
- pthread_cond_signal(&(pool->waiting_cond));
+ if (pool->thread_count == 1)
+ signal_to_parent(pool);
while (true) {
- while (TAILQ_EMPTY(&(pool->queue)) && !pool->stop) {
- /* Wait until the parent sends us work. */
- pr_op_debug("Thread %s.%u: Waiting for work...",
- pool->name, thread_id);
- pthread_cond_wait(&(pool->working_cond), &(pool->lock));
- }
+ while (TAILQ_EMPTY(&pool->queue) && !pool->stop)
+ wait_for_parent_signal(pool, thread_id);
if (pool->stop)
break;
/* Claim the work. */
task = task_queue_pull(pool, thread_id);
pool->working_count++;
- thread_pool_unlock(pool);
+ mutex_unlock(pool);
if (task != NULL) {
task->cb(task->arg);
pr_op_debug("Thread %s.%u: Task '%s' ended", pool->name,
thread_id, task->name);
- /* Now releasing the task */
task_destroy(task);
}
- thread_pool_lock(pool);
+ mutex_lock(pool);
pool->working_count--;
- /* If there's no more work left, signal the parent. */
- if (!pool->stop && pool->working_count == 0 &&
- TAILQ_EMPTY(&(pool->queue)))
- pthread_cond_signal(&(pool->waiting_cond));
+ if (pool->stop)
+ break;
+ /* If there's no more work left, wake up parent. */
+ if (pool->working_count == 0 && TAILQ_EMPTY(&pool->queue))
+ signal_to_parent(pool);
}
/* The thread will cease to exist */
pool->thread_count--;
- pthread_cond_signal(&(pool->waiting_cond));
- thread_pool_unlock(pool);
+ if (pool->thread_count == 0)
+ signal_to_parent(pool);
+ mutex_unlock(pool);
pr_op_debug("Thread %s.%u: Terminating.", pool->name, thread_id);
return NULL;
return pr_enomem();
/* Init locking */
- error = pthread_mutex_init(&(result->lock), NULL);
+ error = pthread_mutex_init(&result->lock, NULL);
if (error) {
error = pr_op_errno(error, "Calling pthread_mutex_init()");
goto free_tmp;
}
/* Init conditional to signal pending work */
- error = pthread_cond_init(&(result->working_cond), NULL);
+ error = pthread_cond_init(&result->parent2worker, NULL);
if (error) {
error = pr_op_errno(error,
"Calling pthread_cond_init() at working condition");
}
/* Init conditional to signal no pending work */
- error = pthread_cond_init(&(result->waiting_cond), NULL);
+ error = pthread_cond_init(&result->worker2parent, NULL);
if (error) {
error = pr_op_errno(error,
"Calling pthread_cond_init() at waiting condition");
goto free_working_cond;
}
- TAILQ_INIT(&(result->queue));
+ TAILQ_INIT(&result->queue);
result->name = name;
result->stop = false;
result->working_count = 0;
*pool = result;
return 0;
free_waiting_cond:
- pthread_cond_destroy(&(result->waiting_cond));
+ pthread_cond_destroy(&result->worker2parent);
free_working_cond:
- pthread_cond_destroy(&(result->working_cond));
+ pthread_cond_destroy(&result->parent2worker);
free_mutex:
- pthread_mutex_destroy(&(result->lock));
+ pthread_mutex_destroy(&result->lock);
free_tmp:
free(result);
return error;
struct thread_pool_task *tmp;
/* Remove all pending work and send the signal to stop it */
- thread_pool_lock(pool);
+ mutex_lock(pool);
queue = &(pool->queue);
while (!TAILQ_EMPTY(queue)) {
tmp = TAILQ_FIRST(queue);
task_destroy(tmp);
}
pool->stop = true;
- pthread_cond_broadcast(&(pool->working_cond));
- thread_pool_unlock(pool);
+ pthread_cond_broadcast(&pool->parent2worker);
+ mutex_unlock(pool);
/* Wait for all to end */
thread_pool_wait(pool);
- pthread_cond_destroy(&(pool->waiting_cond));
- pthread_cond_destroy(&(pool->working_cond));
- pthread_mutex_destroy(&(pool->lock));
+ pthread_cond_destroy(&pool->worker2parent);
+ pthread_cond_destroy(&pool->parent2worker);
+ pthread_mutex_destroy(&pool->lock);
free(pool);
}
if (pool->thread_count != 0)
return 0;
- /* 2 seconds to start a thread */
+ /* Give the threads 2 more seconds */
clock_gettime(CLOCK_REALTIME, &tmout);
tmout.tv_sec += 2;
- error = pthread_cond_timedwait(&(pool->waiting_cond), &(pool->lock),
+ error = pthread_cond_timedwait(&pool->worker2parent, &pool->lock,
&tmout);
if (pool->thread_count != 0)
struct thread_pool_task *task;
int error;
- task = NULL;
error = task_create(task_name, cb, arg, &task);
if (error)
return error;
- thread_pool_lock(pool);
+ mutex_lock(pool);
error = validate_thread_count(pool);
if (error) {
+ mutex_unlock(pool);
task_destroy(task);
- goto end;
+ return error;
}
task_queue_push(pool, task);
- /* There's work to do! */
- pthread_cond_signal(&(pool->working_cond));
-
-end:
- thread_pool_unlock(pool);
- return error;
+ mutex_unlock(pool);
+ signal_to_worker(pool);
+ return 0;
}
/* There are available threads to work? */
{
bool result;
- thread_pool_lock(pool);
+ mutex_lock(pool);
result = (pool->working_count < pool->thread_count);
- thread_pool_unlock(pool);
+ mutex_unlock(pool);
return result;
}
void
thread_pool_wait(struct thread_pool *pool)
{
- thread_pool_lock(pool);
+ mutex_lock(pool);
+
while (true) {
pr_op_debug("- Stop: %s", pool->stop ? "true" : "false");
pr_op_debug("- Working count: %u", pool->working_count);
pr_op_debug("- Thread count: %u", pool->thread_count);
pr_op_debug("- Empty task queue: %s",
- TAILQ_EMPTY(&(pool->queue)) ? "true" : "false");
+ TAILQ_EMPTY(&pool->queue) ? "true" : "false");
if (pool->stop) {
/* Wait until all Working Threads are dead. */
} else {
/* Wait until all Working Threads finish. */
if (pool->working_count == 0 &&
- TAILQ_EMPTY(&(pool->queue)))
+ TAILQ_EMPTY(&pool->queue))
break;
}
pr_op_debug("Pool '%s': Waiting for tasks to be completed",
pool->name);
- pthread_cond_wait(&(pool->waiting_cond), &(pool->lock));
+ wait_for_worker_signal(pool);
}
- thread_pool_unlock(pool);
- pr_op_debug("Pool '%s': Waiting has ended, all tasks done",
- pool->name);
+
+ mutex_unlock(pool);
+ pr_op_debug("Pool '%s': Waiting has ended, all tasks done", pool->name);
}