for (;;) {
/* Lock the mutex and wait for a non-empty queue or until shutdown */
pthread_mutex_lock(&ctx->queueMutex);
+
while (ctx->queueEmpty && !ctx->shutdown) {
pthread_cond_wait(&ctx->queuePopCond, &ctx->queueMutex);
}
ctx->queueEmpty = ctx->queueHead == ctx->queueTail;
/* Unlock the mutex, signal a pusher, and run the job */
pthread_mutex_unlock(&ctx->queueMutex);
- pthread_cond_signal(&ctx->queuePushCond);
+
+ if (ctx->queueSize > 1) {
+ pthread_cond_signal(&ctx->queuePushCond);
+ }
+
job.function(job.opaque);
- pthread_mutex_lock(&ctx->queueMutex);
- ctx->numThreadsBusy--;
- pthread_mutex_unlock(&ctx->queueMutex);
+ /* If the intended queue size was 0, signal after finishing job */
+ if (ctx->queueSize == 1) {
+ pthread_mutex_lock(&ctx->queueMutex);
+ ctx->numThreadsBusy--;
+ pthread_mutex_unlock(&ctx->queueMutex);
+ pthread_cond_signal(&ctx->queuePushCond);
+ }
}
}
/* Unreachable */
+ ctx->numThreads * sizeof(pthread_t);
}
+/**
+ * Returns 1 if the queue is full and 0 otherwise.
+ *
+ * If the queueSize is 1 (the pool was created with an intended queueSize of 0),
+ * then a queue is empty if there is a thread free and no job is waiting.
+ */
+static int isQueueFull(POOL_ctx const* ctx) {
+ if (ctx->queueSize > 1) {
+ return ctx->queueHead == ((ctx->queueTail + 1) % ctx->queueSize);
+ } else {
+ return ctx->numThreadsBusy == ctx->numThreads ||
+ !ctx->queueEmpty;
+ }
+}
+
void POOL_add(void* ctxVoid, POOL_function function, void *opaque) {
POOL_ctx* const ctx = (POOL_ctx*)ctxVoid;
if (!ctx) { return; }
pthread_mutex_lock(&ctx->queueMutex);
{ POOL_job const job = {function, opaque};
- // Wait until there is space in the queue for the new job.
- // If the ctx->queueSize is 1 (the pool was created with an
- // intended queueSize of 0) and there is no job already waiting,
- // wait until there is a thread free for the new job.
- size_t newTail = (ctx->queueTail + 1) % ctx->queueSize;
- while (ctx->queueHead == newTail && !ctx->shutdown &&
- (ctx->queueSize > 1 || ctx->numThreadsBusy == ctx->numThreads ||
- !ctx->queueEmpty)) {
+ /* Wait until there is space in the queue for the new job */
+ while (isQueueFull(ctx) && !ctx->shutdown) {
pthread_cond_wait(&ctx->queuePushCond, &ctx->queueMutex);
- newTail = (ctx->queueTail + 1) % ctx->queueSize;
}
/* The queue is still going => there is space */
if (!ctx->shutdown) {
ctx->queueEmpty = 0;
ctx->queue[ctx->queueTail] = job;
- ctx->queueTail = newTail;
+ ctx->queueTail = (ctx->queueTail + 1) % ctx->queueSize;
}
}
pthread_mutex_unlock(&ctx->queueMutex);