]> git.ipfire.org Git - thirdparty/zstd.git/commitdiff
Allow queueSize=0 in pool.c and update poolTests
authorStella Lau <laus@fb.com>
Mon, 31 Jul 2017 17:10:16 +0000 (10:10 -0700)
committerStella Lau <laus@fb.com>
Mon, 31 Jul 2017 17:10:16 +0000 (10:10 -0700)
lib/common/pool.c
tests/poolTests.c

index aeaca7e791d6b538f618c0c1ea058e304d20c8d8..f51beccc59c5d9240175173de9718d6d9510bc0d 100644 (file)
@@ -13,6 +13,8 @@
 #include <stdlib.h>  /* malloc, calloc, free */
 #include "pool.h"
 
+#include <stdio.h>
+
 /* ======   Compiler specifics   ====== */
 #if defined(_MSC_VER)
 #  pragma warning(disable : 4204)        /* disable: C4204: non-constant aggregate initializer */
@@ -34,11 +36,18 @@ struct POOL_ctx_s {
     pthread_t *threads;
     size_t numThreads;
 
+    size_t numThreadsBusy;
+
     /* The queue is a circular buffer */
     POOL_job *queue;
     size_t queueHead;
     size_t queueTail;
     size_t queueSize;
+
+    size_t jobsQueued;
+
+    size_t marker;
+
     /* The mutex protects the queue */
     pthread_mutex_t queueMutex;
     /* Condition variable for pushers to wait on when the queue is full */
@@ -60,21 +69,30 @@ static void* POOL_thread(void* opaque) {
     for (;;) {
         /* Lock the mutex and wait for a non-empty queue or until shutdown */
         pthread_mutex_lock(&ctx->queueMutex);
-        while (ctx->queueHead == ctx->queueTail && !ctx->shutdown) {
+//        while (ctx->queueHead == ctx->queueTail && !ctx->shutdown) {
+        while (!ctx->jobsQueued && !ctx->shutdown && !ctx->marker) {
             pthread_cond_wait(&ctx->queuePopCond, &ctx->queueMutex);
         }
         /* empty => shutting down: so stop */
-        if (ctx->queueHead == ctx->queueTail) {
+        if (!ctx->jobsQueued && !ctx->marker) {
             pthread_mutex_unlock(&ctx->queueMutex);
             return opaque;
         }
         /* Pop a job off the queue */
-        {   POOL_job const job = ctx->queue[ctx->queueHead];
+        {
+            POOL_job const job = ctx->queue[ctx->queueHead];
             ctx->queueHead = (ctx->queueHead + 1) % ctx->queueSize;
+            ctx->jobsQueued--;
+            ctx->numThreadsBusy++;
+            ctx->marker = 0;
             /* Unlock the mutex, signal a pusher, and run the job */
             pthread_mutex_unlock(&ctx->queueMutex);
             pthread_cond_signal(&ctx->queuePushCond);
             job.function(job.opaque);
+
+            pthread_mutex_lock(&ctx->queueMutex);
+            ctx->numThreadsBusy--;
+            pthread_mutex_unlock(&ctx->queueMutex);
         }
     }
     /* Unreachable */
@@ -83,7 +101,7 @@ static void* POOL_thread(void* opaque) {
 POOL_ctx *POOL_create(size_t numThreads, size_t queueSize) {
     POOL_ctx *ctx;
     /* Check the parameters */
-    if (!numThreads || !queueSize) { return NULL; }
+    if (!numThreads) { return NULL; }
     /* Allocate the context and zero initialize */
     ctx = (POOL_ctx *)calloc(1, sizeof(POOL_ctx));
     if (!ctx) { return NULL; }
@@ -95,6 +113,9 @@ POOL_ctx *POOL_create(size_t numThreads, size_t queueSize) {
     ctx->queue = (POOL_job*) malloc(ctx->queueSize * sizeof(POOL_job));
     ctx->queueHead = 0;
     ctx->queueTail = 0;
+    ctx->numThreadsBusy = 0;
+    ctx->jobsQueued = 0;
+    ctx->marker = 0;
     (void)pthread_mutex_init(&ctx->queueMutex, NULL);
     (void)pthread_cond_init(&ctx->queuePushCond, NULL);
     (void)pthread_cond_init(&ctx->queuePopCond, NULL);
@@ -161,14 +182,20 @@ void POOL_add(void* ctxVoid, POOL_function function, void *opaque) {
     {   POOL_job const job = {function, opaque};
         /* Wait until there is space in the queue for the new job */
         size_t newTail = (ctx->queueTail + 1) % ctx->queueSize;
-        while (ctx->queueHead == newTail && !ctx->shutdown) {
+        while (ctx->queueHead == newTail && !ctx->shutdown &&
+               (ctx->queueSize > 1 || ctx->numThreadsBusy == ctx->numThreads ||
+                ctx->marker)) {
           pthread_cond_wait(&ctx->queuePushCond, &ctx->queueMutex);
           newTail = (ctx->queueTail + 1) % ctx->queueSize;
         }
         /* The queue is still going => there is space */
         if (!ctx->shutdown) {
+            if (ctx->queueSize > 1) {
+              ctx->marker = 1;
+            }
             ctx->queue[ctx->queueTail] = job;
             ctx->queueTail = newTail;
+            ctx->jobsQueued++;
         }
     }
     pthread_mutex_unlock(&ctx->queueMutex);
index adc5947df629525d582306f3664109b279178099..09e6d6adf08ab485931d5fda9f6e6b203509b450 100644 (file)
@@ -54,7 +54,7 @@ int main(int argc, const char **argv) {
   size_t numThreads;
   for (numThreads = 1; numThreads <= 4; ++numThreads) {
     size_t queueSize;
-    for (queueSize = 1; queueSize <= 2; ++queueSize) {
+    for (queueSize = 0; queueSize <= 2; ++queueSize) {
       if (testOrder(numThreads, queueSize)) {
         printf("FAIL: testOrder\n");
         return 1;
@@ -64,7 +64,7 @@ int main(int argc, const char **argv) {
   printf("PASS: testOrder\n");
   (void)argc;
   (void)argv;
-  return (POOL_create(0, 1) || POOL_create(1, 0)) ? printf("FAIL: testInvalid\n"), 1
-                                                  : printf("PASS: testInvalid\n"), 0;
+  return (POOL_create(0, 1)) ? printf("FAIL: testInvalid\n"), 1
+                             : printf("PASS: testInvalid\n"), 0;
   return 0;
 }