*/
pthread_cond_t working_cond;
/*
- * Used by Working Threads to signal that all the work is done,
- * for the benefit of the Parent Thread.
+ * Used by Working Threads to signal the Parent Thread that workers are
+ * ready (during initialization) or that all the work is done (after
+ * initialization).
*/
pthread_cond_t waiting_cond;
/* Number of Working Threads. */
struct thread_pool *pool = arg;
struct task *task;
+ thread_pool_lock(pool);
+ /* We're running; signal the Parent Thread. */
+ pool->thread_count++;
+ pthread_cond_signal(&(pool->waiting_cond));
+
while (true) {
while (TAILQ_EMPTY(&(pool->queue)) && !pool->stop) {
/* Wait until the parent sends us work. */
goto end;
}
- pool->thread_count++;
pr_op_debug("Pool thread #%u spawned", i);
}
free(pool);
}
+/*
+ * pthread_create() does not guarantee that the threads will have started by the
+ * time work needs to be done.
+ *
+ * If they haven't started, this function will add a little timeout.
+ *
+ * Lock must be held.
+ */
+static int validate_thread_count(struct thread_pool *pool)
+{
+ struct timespec tmout;
+ int error;
+
+ if (pool->thread_count != 0)
+ return 0;
+
+ /* 2 seconds to start a thread */
+ clock_gettime(CLOCK_REALTIME, &tmout);
+ tmout.tv_sec += 2;
+ error = pthread_cond_timedwait(&(pool->waiting_cond), &(pool->lock),
+ &tmout);
+
+ if (pool->thread_count != 0)
+ return 0;
+
+ return pr_op_errno(error, "Waiting thread to start");
+}
+
/*
* Push a new task to @pool, the task to be executed is @cb with the argument
* @arg.
return error;
thread_pool_lock(pool);
+
+ error = validate_thread_count(pool);
+ if (error) {
+ task_destroy(task);
+ goto end;
+ }
+
task_queue_push(&(pool->queue), task);
/* There's work to do! */
pthread_cond_signal(&(pool->working_cond));
- thread_pool_unlock(pool);
- return 0;
+end:
+ thread_pool_unlock(pool);
+ return error;
}
/* There are available threads to work? */