From: Stella Lau Date: Wed, 2 Aug 2017 03:12:06 +0000 (-0700) Subject: Signal after finishing job when queueSize=0 X-Git-Tag: v1.3.1^2~12^2~6^2~2 X-Git-Url: http://git.ipfire.org/cgi-bin/gitweb.cgi?a=commitdiff_plain;h=73ba58955fa238d23bc7ba483c10b722af5e8c3b;p=thirdparty%2Fzstd.git Signal after finishing job when queueSize=0 --- diff --git a/lib/common/pool.c b/lib/common/pool.c index e25b1d75e..e140f1e88 100644 --- a/lib/common/pool.c +++ b/lib/common/pool.c @@ -66,6 +66,7 @@ static void* POOL_thread(void* opaque) { for (;;) { /* Lock the mutex and wait for a non-empty queue or until shutdown */ pthread_mutex_lock(&ctx->queueMutex); + while (ctx->queueEmpty && !ctx->shutdown) { pthread_cond_wait(&ctx->queuePopCond, &ctx->queueMutex); } @@ -82,12 +83,20 @@ static void* POOL_thread(void* opaque) { ctx->queueEmpty = ctx->queueHead == ctx->queueTail; /* Unlock the mutex, signal a pusher, and run the job */ pthread_mutex_unlock(&ctx->queueMutex); - pthread_cond_signal(&ctx->queuePushCond); + + if (ctx->queueSize > 1) { + pthread_cond_signal(&ctx->queuePushCond); + } + job.function(job.opaque); - pthread_mutex_lock(&ctx->queueMutex); - ctx->numThreadsBusy--; - pthread_mutex_unlock(&ctx->queueMutex); + /* If the intended queue size was 0, signal after finishing job */ + if (ctx->queueSize == 1) { + pthread_mutex_lock(&ctx->queueMutex); + ctx->numThreadsBusy--; + pthread_mutex_unlock(&ctx->queueMutex); + pthread_cond_signal(&ctx->queuePushCond); + } } } /* Unreachable */ @@ -168,6 +177,21 @@ size_t POOL_sizeof(POOL_ctx *ctx) { + ctx->numThreads * sizeof(pthread_t); } +/** + * Returns 1 if the queue is full and 0 otherwise. + * + * If the queueSize is 1 (the pool was created with an intended queueSize of 0), + * then a queue is empty if there is a thread free and no job is waiting. + */ +static int isQueueFull(POOL_ctx const* ctx) { + if (ctx->queueSize > 1) { + return ctx->queueHead == ((ctx->queueTail + 1) % ctx->queueSize); + } else { + return ctx->numThreadsBusy == ctx->numThreads || + !ctx->queueEmpty; + } +} + void POOL_add(void* ctxVoid, POOL_function function, void *opaque) { POOL_ctx* const ctx = (POOL_ctx*)ctxVoid; if (!ctx) { return; } @@ -175,22 +199,15 @@ void POOL_add(void* ctxVoid, POOL_function function, void *opaque) { pthread_mutex_lock(&ctx->queueMutex); { POOL_job const job = {function, opaque}; - // Wait until there is space in the queue for the new job. - // If the ctx->queueSize is 1 (the pool was created with an - // intended queueSize of 0) and there is no job already waiting, - // wait until there is a thread free for the new job. - size_t newTail = (ctx->queueTail + 1) % ctx->queueSize; - while (ctx->queueHead == newTail && !ctx->shutdown && - (ctx->queueSize > 1 || ctx->numThreadsBusy == ctx->numThreads || - !ctx->queueEmpty)) { + /* Wait until there is space in the queue for the new job */ + while (isQueueFull(ctx) && !ctx->shutdown) { pthread_cond_wait(&ctx->queuePushCond, &ctx->queueMutex); - newTail = (ctx->queueTail + 1) % ctx->queueSize; } /* The queue is still going => there is space */ if (!ctx->shutdown) { ctx->queueEmpty = 0; ctx->queue[ctx->queueTail] = job; - ctx->queueTail = newTail; + ctx->queueTail = (ctx->queueTail + 1) % ctx->queueSize; } } pthread_mutex_unlock(&ctx->queueMutex);