tmp = TAILQ_LAST(queue, task_queue);
TAILQ_REMOVE(queue, tmp, next);
+ pr_op_debug("Pulling a task from the pool");
return tmp;
}
task_queue_push(struct task_queue *queue, struct task *task)
{
TAILQ_INSERT_HEAD(queue, task, next);
+ pr_op_debug("Pushing a task to the pool");
}
/*
/* The thread has started, send the signal */
thread_pool_lock(pool);
pthread_cond_signal(&(pool->waiting_cond));
- thread_pool_unlock(pool);
while (true) {
- thread_pool_lock(pool);
-
- while (TAILQ_EMPTY(&(pool->queue)) && !pool->stop)
+ while (TAILQ_EMPTY(&(pool->queue)) && !pool->stop) {
+ pr_op_debug("Thread waiting for work...");
pthread_cond_wait(&(pool->working_cond), &(pool->lock));
+ }
if (pool->stop)
break;
if (!pool->stop && pool->working_count == 0 &&
TAILQ_EMPTY(&(pool->queue)))
pthread_cond_signal(&(pool->waiting_cond));
-
- thread_pool_unlock(pool);
}
/* The thread will cease to exist */
clock_gettime(CLOCK_REALTIME, &tmout);
tmout.tv_sec += 2;
+ thread_pool_lock(pool);
error = pthread_cond_timedwait(&(pool->waiting_cond), &(pool->lock),
&tmout);
- if (error)
+ if (error) {
+ thread_pool_unlock(pool);
return pr_op_errno(error, "Waiting thread to start");
+ }
+ thread_pool_unlock(pool);
return 0;
}
static int
-tpool_thread_spawn(struct thread_pool *pool, thread_pool_task_cb entry_point)
+thread_pool_attr_create(pthread_attr_t *attr)
{
- pthread_attr_t attr;
- pthread_t thread_id;
int error;
- memset(&thread_id, 0, sizeof(pthread_t));
-
- error = pthread_attr_init(&attr);
+ error = pthread_attr_init(attr);
if (error)
return pr_op_errno(error, "Calling pthread_attr_init()");
/* Use 2MB (default in most 64 bits systems) */
- error = pthread_attr_setstacksize(&attr, 1024 * 1024 * 2);
- if (error)
+ error = pthread_attr_setstacksize(attr, 1024 * 1024 * 2);
+ if (error) {
+ pthread_attr_destroy(attr);
return pr_op_errno(error,
"Calling pthread_attr_setstacksize()");
+ }
- error = pthread_attr_setdetachstate(&attr, PTHREAD_CREATE_DETACHED);
- if (error)
+ error = pthread_attr_setdetachstate(attr, PTHREAD_CREATE_DETACHED);
+ if (error) {
+ pthread_attr_destroy(attr);
return pr_op_errno(error,
"Calling pthread_attr_setdetachstate()");
+ }
- thread_pool_lock(pool);
- error = pthread_create(&thread_id, &attr, entry_point, pool);
- pthread_attr_destroy(&attr);
- if (error) {
- thread_pool_unlock(pool);
+ return 0;
+}
+
+static int
+tpool_thread_spawn(struct thread_pool *pool, pthread_attr_t *attr,
+ thread_pool_task_cb entry_point)
+{
+ pthread_t thread_id;
+ int error;
+
+ memset(&thread_id, 0, sizeof(pthread_t));
+
+ error = pthread_create(&thread_id, attr, entry_point, pool);
+ if (error)
return pr_op_errno(error, "Spawning pool thread");
- }
error = thread_pool_thread_wait_start(pool);
- if (error) {
- thread_pool_unlock(pool);
+ if (error)
return error;
- }
- thread_pool_unlock(pool);
return 0;
}
thread_pool_create(unsigned int threads, struct thread_pool **pool)
{
struct thread_pool *tmp;
+ pthread_attr_t attr;
unsigned int i;
int error;
TAILQ_INIT(&(tmp->queue));
tmp->stop = false;
tmp->working_count = 0;
- tmp->thread_count = threads;
+ tmp->thread_count = 0;
+
+ error = thread_pool_attr_create(&attr);
+ if (error)
+ goto free_waiting_cond;
for (i = 0; i < threads; i++) {
- error = tpool_thread_spawn(tmp, tasks_poll);
+ error = tpool_thread_spawn(tmp, &attr, tasks_poll);
if (error) {
+ pthread_attr_destroy(&attr);
thread_pool_destroy(tmp);
return error;
}
+ tmp->thread_count++;
pr_op_debug("Pool thread #%u spawned", i);
}
+ pthread_attr_destroy(&attr);
*pool = tmp;
return 0;
+free_waiting_cond:
+ pthread_cond_destroy(&(tmp->waiting_cond));
free_working_cond:
pthread_cond_destroy(&(tmp->working_cond));
free_mutex:
thread_pool_lock(pool);
task_queue_push(&(pool->queue), task);
- thread_pool_unlock(pool);
-
/* There's work to do! */
- pthread_cond_broadcast(&(pool->working_cond));
+ pthread_cond_signal(&(pool->working_cond));
+ thread_pool_unlock(pool);
return 0;
}
-/* Are there available threads to work? */
+/* There are available threads to work? */
bool
thread_pool_avail_threads(struct thread_pool *pool)
{
thread_pool_lock(pool);
while (true) {
pr_op_debug("Waiting all tasks from the pool to end");
- if ((!pool->stop && pool->working_count != 0) ||
+ pr_op_debug("- Stop: %s", pool->stop ? "true" : "false");
+ pr_op_debug("- Working count: %u", pool->working_count);
+ pr_op_debug("- Thread count: %u", pool->thread_count);
+ pr_op_debug("- Empty queue: %s",
+ TAILQ_EMPTY(&(pool->queue)) ? "true" : "false");
+ if ((!pool->stop &&
+ (pool->working_count != 0 || !TAILQ_EMPTY(&(pool->queue)))) ||
(pool->stop && pool->thread_count != 0))
pthread_cond_wait(&(pool->waiting_cond), &(pool->lock));
else