pthread_cond_t worker2parent;
/* Number of Working Threads. */
unsigned int working_count;
- /* Number of Worker Threads. */
+
+ /*
+ * Just a counter. Its use is very specific; you probably don't want
+ * to rely on this.
+ * See @thread_ids_len for what you probably actually want.
+ */
unsigned int thread_count;
+
/*
* Enable to signal all threads to stop.
* (But all ongoing tasks will be completed first.)
* Worker Thread to claim them.
*/
struct task_queue queue;
+
+ pthread_t *thread_ids; /* Array. */
+ unsigned int thread_ids_len;
};
static void
unsigned int thread_id;
mutex_lock(pool);
- thread_id = pool->thread_count;
- /* We're running; signal the Parent Thread. */
pool->thread_count++;
- if (pool->thread_count == 1)
- signal_to_parent(pool);
+ thread_id = pool->thread_count;
while (true) {
while (TAILQ_EMPTY(&pool->queue) && !pool->stop)
signal_to_parent(pool);
}
- /* The thread will cease to exist */
- pool->thread_count--;
- if (pool->thread_count == 0)
- signal_to_parent(pool);
mutex_unlock(pool);
-
- pr_op_debug("Thread %s.%u: Terminating.", pool->name, thread_id);
+ pr_op_debug("Thread %s.%u: Returning.", pool->name, thread_id);
return NULL;
}
"Calling pthread_attr_setstacksize()");
}
- error = pthread_attr_setdetachstate(attr, PTHREAD_CREATE_DETACHED);
- if (error) {
- pthread_attr_destroy(attr);
- return pr_op_errno(error,
- "Calling pthread_attr_setdetachstate()");
- }
+ /*
+ * In the original implementation, they set the threads as detached
+ * here.
+ *
+ * This lead to giant load of miserable trouble, because unless I'm
+ * missing something, they seemed to assume there was something ensuring
+ * the threads had spawned by the time the parent posted work, and also,
+ * something ensuring that the threads had died by the time the main
+ * thread started cleaning up modules. Neither of these were true at
+ * all.
+ *
+ * Between complicating the code with even more control logic, and
+ * employing joinable threads, I chose the latter. I don't think I will
+ * ever use detached threads ever again.
+ */
return 0;
}
static int
-spawn_threads(struct thread_pool *pool, unsigned int threads)
+spawn_threads(struct thread_pool *pool)
{
pthread_attr_t attr;
- pthread_t thread_id;
unsigned int i;
int error;
if (error)
return error;
- for (i = 0; i < threads; i++) {
- memset(&thread_id, 0, sizeof(pthread_t));
- error = pthread_create(&thread_id, &attr, tasks_poll, pool);
+ for (i = 0; i < pool->thread_ids_len; i++) {
+ error = pthread_create(&pool->thread_ids[i], &attr, tasks_poll,
+ pool);
if (error) {
error = pr_op_errno(error, "Spawning pool thread");
goto end;
}
- pr_op_debug("Pool '%s': Thread #%u spawned", pool->name, i);
+ pr_op_debug("Pool '%s': Thread #%u spawned", pool->name, i + 1);
}
end:
result->stop = false;
result->working_count = 0;
result->thread_count = 0;
+ result->thread_ids = calloc(threads, sizeof(pthread_t));
+ if (result->thread_ids == NULL) {
+ error = pr_enomem();
+ goto free_waiting_cond;
+ }
+ result->thread_ids_len = threads;
- error = spawn_threads(result, threads);
+ error = spawn_threads(result);
if (error)
- goto free_waiting_cond;
+ goto free_thread_ids;
*pool = result;
return 0;
+
+free_thread_ids:
+ free(result->thread_ids);
free_waiting_cond:
pthread_cond_destroy(&result->worker2parent);
free_working_cond:
{
struct task_queue *queue;
struct thread_pool_task *tmp;
+ unsigned int t;
+
+ pr_op_debug("Destroying thread pool '%s'.", pool->name);
/* Remove all pending work and send the signal to stop it */
mutex_lock(pool);
pthread_cond_broadcast(&pool->parent2worker);
mutex_unlock(pool);
- /* Wait for all to end */
- thread_pool_wait(pool);
+ for (t = 0; t < pool->thread_ids_len; t++)
+ pthread_join(pool->thread_ids[t], NULL);
+ free(pool->thread_ids);
pthread_cond_destroy(&pool->worker2parent);
pthread_cond_destroy(&pool->parent2worker);
pthread_mutex_destroy(&pool->lock);
free(pool);
-}
-
-/*
- * pthread_create() does not guarantee that the threads will have started by the
- * time work needs to be done.
- *
- * If they haven't started, this function will add a little timeout.
- *
- * Lock must be held.
- */
-static int validate_thread_count(struct thread_pool *pool)
-{
- struct timespec tmout;
- int error;
- if (pool->thread_count != 0)
- return 0;
-
- /* Give the threads 2 more seconds */
- clock_gettime(CLOCK_REALTIME, &tmout);
- tmout.tv_sec += 2;
- error = pthread_cond_timedwait(&pool->worker2parent, &pool->lock,
- &tmout);
-
- if (pool->thread_count != 0)
- return 0;
-
- return pr_op_errno(error, "Waiting thread to start");
+ pr_op_debug("Destroyed.");
}
/*
return error;
mutex_lock(pool);
-
- error = validate_thread_count(pool);
- if (error) {
- mutex_unlock(pool);
- task_destroy(task);
- return error;
- }
-
task_queue_push(pool, task);
mutex_unlock(pool);
+
+ /*
+ * Note: This assumes the threads have already spawned.
+ * If not, they will claim work once they spawn anyway.
+ */
signal_to_worker(pool);
return 0;
}
bool result;
mutex_lock(pool);
- result = (pool->working_count < pool->thread_count);
+ result = (pool->working_count < pool->thread_ids_len);
mutex_unlock(pool);
return result;
{
mutex_lock(pool);
- while (true) {
- pr_op_debug("- Stop: %s", pool->stop ? "true" : "false");
- pr_op_debug("- Working count: %u", pool->working_count);
- pr_op_debug("- Thread count: %u", pool->thread_count);
- pr_op_debug("- Empty task queue: %s",
- TAILQ_EMPTY(&pool->queue) ? "true" : "false");
-
- if (pool->stop) {
- /* Wait until all Working Threads are dead. */
- if (pool->thread_count == 0)
- break;
- } else {
- /* Wait until all Working Threads finish. */
- if (pool->working_count == 0 &&
- TAILQ_EMPTY(&pool->queue))
- break;
+ /* If the pool has to stop, the wait will happen during the joins. */
+ while (!pool->stop) {
+ pr_op_debug("- Active workers: %u", pool->working_count);
+ pr_op_debug("- Task queue: %s",
+ TAILQ_EMPTY(&pool->queue) ? "Empty" : "Not Empty");
+
+ if (pool->working_count == 0 && TAILQ_EMPTY(&pool->queue)) {
+ pr_op_debug("Pool '%s': All work has been completed.",
+ pool->name);
+ break;
}
pr_op_debug("Pool '%s': Waiting for tasks to be completed",
}
mutex_unlock(pool);
- pr_op_debug("Pool '%s': Waiting has ended, all tasks done", pool->name);
}