From: Stella Lau Date: Mon, 31 Jul 2017 17:10:16 +0000 (-0700) Subject: Allow queueSize=0 in pool.c and update poolTests X-Git-Tag: v1.3.1^2~12^2~6^2~4 X-Git-Url: http://git.ipfire.org/cgi-bin/gitweb.cgi?a=commitdiff_plain;h=5adceeed0175e5312c9b68f00494ecd014aa81d6;p=thirdparty%2Fzstd.git Allow queueSize=0 in pool.c and update poolTests --- diff --git a/lib/common/pool.c b/lib/common/pool.c index aeaca7e79..f51beccc5 100644 --- a/lib/common/pool.c +++ b/lib/common/pool.c @@ -13,6 +13,8 @@ #include /* malloc, calloc, free */ #include "pool.h" +#include + /* ====== Compiler specifics ====== */ #if defined(_MSC_VER) # pragma warning(disable : 4204) /* disable: C4204: non-constant aggregate initializer */ @@ -34,11 +36,18 @@ struct POOL_ctx_s { pthread_t *threads; size_t numThreads; + size_t numThreadsBusy; + /* The queue is a circular buffer */ POOL_job *queue; size_t queueHead; size_t queueTail; size_t queueSize; + + size_t jobsQueued; + + size_t marker; + /* The mutex protects the queue */ pthread_mutex_t queueMutex; /* Condition variable for pushers to wait on when the queue is full */ @@ -60,21 +69,30 @@ static void* POOL_thread(void* opaque) { for (;;) { /* Lock the mutex and wait for a non-empty queue or until shutdown */ pthread_mutex_lock(&ctx->queueMutex); - while (ctx->queueHead == ctx->queueTail && !ctx->shutdown) { +// while (ctx->queueHead == ctx->queueTail && !ctx->shutdown) { + while (!ctx->jobsQueued && !ctx->shutdown && !ctx->marker) { pthread_cond_wait(&ctx->queuePopCond, &ctx->queueMutex); } /* empty => shutting down: so stop */ - if (ctx->queueHead == ctx->queueTail) { + if (!ctx->jobsQueued && !ctx->marker) { pthread_mutex_unlock(&ctx->queueMutex); return opaque; } /* Pop a job off the queue */ - { POOL_job const job = ctx->queue[ctx->queueHead]; + { + POOL_job const job = ctx->queue[ctx->queueHead]; ctx->queueHead = (ctx->queueHead + 1) % ctx->queueSize; + ctx->jobsQueued--; + ctx->numThreadsBusy++; + ctx->marker = 0; /* Unlock the mutex, signal a pusher, and run the job */ pthread_mutex_unlock(&ctx->queueMutex); pthread_cond_signal(&ctx->queuePushCond); job.function(job.opaque); + + pthread_mutex_lock(&ctx->queueMutex); + ctx->numThreadsBusy--; + pthread_mutex_unlock(&ctx->queueMutex); } } /* Unreachable */ @@ -83,7 +101,7 @@ static void* POOL_thread(void* opaque) { POOL_ctx *POOL_create(size_t numThreads, size_t queueSize) { POOL_ctx *ctx; /* Check the parameters */ - if (!numThreads || !queueSize) { return NULL; } + if (!numThreads) { return NULL; } /* Allocate the context and zero initialize */ ctx = (POOL_ctx *)calloc(1, sizeof(POOL_ctx)); if (!ctx) { return NULL; } @@ -95,6 +113,9 @@ POOL_ctx *POOL_create(size_t numThreads, size_t queueSize) { ctx->queue = (POOL_job*) malloc(ctx->queueSize * sizeof(POOL_job)); ctx->queueHead = 0; ctx->queueTail = 0; + ctx->numThreadsBusy = 0; + ctx->jobsQueued = 0; + ctx->marker = 0; (void)pthread_mutex_init(&ctx->queueMutex, NULL); (void)pthread_cond_init(&ctx->queuePushCond, NULL); (void)pthread_cond_init(&ctx->queuePopCond, NULL); @@ -161,14 +182,20 @@ void POOL_add(void* ctxVoid, POOL_function function, void *opaque) { { POOL_job const job = {function, opaque}; /* Wait until there is space in the queue for the new job */ size_t newTail = (ctx->queueTail + 1) % ctx->queueSize; - while (ctx->queueHead == newTail && !ctx->shutdown) { + while (ctx->queueHead == newTail && !ctx->shutdown && + (ctx->queueSize > 1 || ctx->numThreadsBusy == ctx->numThreads || + ctx->marker)) { pthread_cond_wait(&ctx->queuePushCond, &ctx->queueMutex); newTail = (ctx->queueTail + 1) % ctx->queueSize; } /* The queue is still going => there is space */ if (!ctx->shutdown) { + if (ctx->queueSize > 1) { + ctx->marker = 1; + } ctx->queue[ctx->queueTail] = job; ctx->queueTail = newTail; + ctx->jobsQueued++; } } pthread_mutex_unlock(&ctx->queueMutex); diff --git a/tests/poolTests.c b/tests/poolTests.c index adc5947df..09e6d6adf 100644 --- a/tests/poolTests.c +++ b/tests/poolTests.c @@ -54,7 +54,7 @@ int main(int argc, const char **argv) { size_t numThreads; for (numThreads = 1; numThreads <= 4; ++numThreads) { size_t queueSize; - for (queueSize = 1; queueSize <= 2; ++queueSize) { + for (queueSize = 0; queueSize <= 2; ++queueSize) { if (testOrder(numThreads, queueSize)) { printf("FAIL: testOrder\n"); return 1; @@ -64,7 +64,7 @@ int main(int argc, const char **argv) { printf("PASS: testOrder\n"); (void)argc; (void)argv; - return (POOL_create(0, 1) || POOL_create(1, 0)) ? printf("FAIL: testInvalid\n"), 1 - : printf("PASS: testInvalid\n"), 0; + return (POOL_create(0, 1)) ? printf("FAIL: testInvalid\n"), 1 + : printf("PASS: testInvalid\n"), 0; return 0; }