From bb13387d7d04253713ed50c2e77c96df00a26d75 Mon Sep 17 00:00:00 2001 From: Nick Terrell Date: Sat, 31 Dec 2016 19:10:47 -0500 Subject: [PATCH] Fix pool for threading.h --- lib/common/pool.c | 19 +++++++++---------- lib/common/pool.h | 1 + 2 files changed, 10 insertions(+), 10 deletions(-) diff --git a/lib/common/pool.c b/lib/common/pool.c index 97ca7ddab..e24691f77 100644 --- a/lib/common/pool.c +++ b/lib/common/pool.c @@ -51,21 +51,21 @@ static void* POOL_thread(void* opaque) { if (!ctx) { return NULL; } for (;;) { /* Lock the mutex and wait for a non-empty queue or until shutdown */ - if (pthread_mutex_lock(&ctx->queueMutex)) { return NULL; } + pthread_mutex_lock(&ctx->queueMutex); while (ctx->queueHead == ctx->queueTail && !ctx->shutdown) { - if (pthread_cond_wait(&ctx->queuePopCond, &ctx->queueMutex)) { return NULL; } + pthread_cond_wait(&ctx->queuePopCond, &ctx->queueMutex); } /* empty => shutting down: so stop */ if (ctx->queueHead == ctx->queueTail) { - if (pthread_mutex_unlock(&ctx->queueMutex)) { return NULL; } + pthread_mutex_unlock(&ctx->queueMutex); return opaque; } /* Pop a job off the queue */ { POOL_job const job = ctx->queue[ctx->queueHead]; ctx->queueHead = (ctx->queueHead + 1) % ctx->queueSize; /* Unlock the mutex, signal a pusher, and run the job */ - if (pthread_mutex_unlock(&ctx->queueMutex)) { return NULL; } - if (pthread_cond_signal(&ctx->queuePushCond)) { return NULL; } + pthread_mutex_unlock(&ctx->queueMutex); + pthread_cond_signal(&ctx->queuePushCond); job.function(job.opaque); } } @@ -73,7 +73,6 @@ static void* POOL_thread(void* opaque) { } POOL_ctx *POOL_create(size_t numThreads, size_t queueSize) { - int err = 0; POOL_ctx *ctx; /* Check the parameters */ if (!numThreads || !queueSize) { return NULL; } @@ -88,15 +87,15 @@ POOL_ctx *POOL_create(size_t numThreads, size_t queueSize) { ctx->queue = (POOL_job *)malloc(ctx->queueSize * sizeof(POOL_job)); ctx->queueHead = 0; ctx->queueTail = 0; - err |= pthread_mutex_init(&ctx->queueMutex, NULL); - err |= pthread_cond_init(&ctx->queuePushCond, NULL); - err |= pthread_cond_init(&ctx->queuePopCond, NULL); + pthread_mutex_init(&ctx->queueMutex, NULL); + pthread_cond_init(&ctx->queuePushCond, NULL); + pthread_cond_init(&ctx->queuePopCond, NULL); ctx->shutdown = 0; /* Allocate space for the thread handles */ ctx->threads = (pthread_t *)malloc(numThreads * sizeof(pthread_t)); ctx->numThreads = 0; /* Check for errors */ - if (!ctx->threads || !ctx->queue || err) { POOL_free(ctx); return NULL; } + if (!ctx->threads || !ctx->queue) { POOL_free(ctx); return NULL; } /* Initialize the threads */ { size_t i; for (i = 0; i < numThreads; ++i) { diff --git a/lib/common/pool.h b/lib/common/pool.h index f4afc1ee3..c26f543fc 100644 --- a/lib/common/pool.h +++ b/lib/common/pool.h @@ -39,6 +39,7 @@ typedef void (*POOL_add_function)(void *, POOL_function, void *); /*! POOL_add() : Add the job `function(opaque)` to the thread pool. Possibly blocks until there is room in the queue. + Note : The function may be executed asynchronously, so `opaque` must live until the function has been completed. */ void POOL_add(void *ctx, POOL_function function, void *opaque); -- 2.47.3