if (!ctx) { return NULL; }
for (;;) {
/* Lock the mutex and wait for a non-empty queue or until shutdown */
- if (pthread_mutex_lock(&ctx->queueMutex)) { return NULL; }
+ pthread_mutex_lock(&ctx->queueMutex);
while (ctx->queueHead == ctx->queueTail && !ctx->shutdown) {
- if (pthread_cond_wait(&ctx->queuePopCond, &ctx->queueMutex)) { return NULL; }
+ pthread_cond_wait(&ctx->queuePopCond, &ctx->queueMutex);
}
/* empty => shutting down: so stop */
if (ctx->queueHead == ctx->queueTail) {
- if (pthread_mutex_unlock(&ctx->queueMutex)) { return NULL; }
+ pthread_mutex_unlock(&ctx->queueMutex);
return opaque;
}
/* Pop a job off the queue */
{ POOL_job const job = ctx->queue[ctx->queueHead];
ctx->queueHead = (ctx->queueHead + 1) % ctx->queueSize;
/* Unlock the mutex, signal a pusher, and run the job */
- if (pthread_mutex_unlock(&ctx->queueMutex)) { return NULL; }
- if (pthread_cond_signal(&ctx->queuePushCond)) { return NULL; }
+ pthread_mutex_unlock(&ctx->queueMutex);
+ pthread_cond_signal(&ctx->queuePushCond);
job.function(job.opaque);
}
}
}
POOL_ctx *POOL_create(size_t numThreads, size_t queueSize) {
- int err = 0;
POOL_ctx *ctx;
/* Check the parameters */
if (!numThreads || !queueSize) { return NULL; }
ctx->queue = (POOL_job *)malloc(ctx->queueSize * sizeof(POOL_job));
ctx->queueHead = 0;
ctx->queueTail = 0;
- err |= pthread_mutex_init(&ctx->queueMutex, NULL);
- err |= pthread_cond_init(&ctx->queuePushCond, NULL);
- err |= pthread_cond_init(&ctx->queuePopCond, NULL);
+ pthread_mutex_init(&ctx->queueMutex, NULL);
+ pthread_cond_init(&ctx->queuePushCond, NULL);
+ pthread_cond_init(&ctx->queuePopCond, NULL);
ctx->shutdown = 0;
/* Allocate space for the thread handles */
ctx->threads = (pthread_t *)malloc(numThreads * sizeof(pthread_t));
ctx->numThreads = 0;
/* Check for errors */
- if (!ctx->threads || !ctx->queue || err) { POOL_free(ctx); return NULL; }
+ if (!ctx->threads || !ctx->queue) { POOL_free(ctx); return NULL; }
/* Initialize the threads */
{ size_t i;
for (i = 0; i < numThreads; ++i) {