]> git.ipfire.org Git - thirdparty/zstd.git/commitdiff
Signal after finishing job when queueSize=0
authorStella Lau <laus@fb.com>
Wed, 2 Aug 2017 03:12:06 +0000 (20:12 -0700)
committerStella Lau <laus@fb.com>
Wed, 2 Aug 2017 03:12:06 +0000 (20:12 -0700)
lib/common/pool.c

index e25b1d75e82c20092594a9c1c88bcd37f1a87f49..e140f1e88961e5628933ad9c8012b04253d8fe5c 100644 (file)
@@ -66,6 +66,7 @@ static void* POOL_thread(void* opaque) {
     for (;;) {
         /* Lock the mutex and wait for a non-empty queue or until shutdown */
         pthread_mutex_lock(&ctx->queueMutex);
+
         while (ctx->queueEmpty && !ctx->shutdown) {
             pthread_cond_wait(&ctx->queuePopCond, &ctx->queueMutex);
         }
@@ -82,12 +83,20 @@ static void* POOL_thread(void* opaque) {
             ctx->queueEmpty = ctx->queueHead == ctx->queueTail;
             /* Unlock the mutex, signal a pusher, and run the job */
             pthread_mutex_unlock(&ctx->queueMutex);
-            pthread_cond_signal(&ctx->queuePushCond);
+
+            if (ctx->queueSize > 1) {
+                pthread_cond_signal(&ctx->queuePushCond);
+            }
+
             job.function(job.opaque);
 
-            pthread_mutex_lock(&ctx->queueMutex);
-            ctx->numThreadsBusy--;
-            pthread_mutex_unlock(&ctx->queueMutex);
+            /* If the intended queue size was 0, signal after finishing job */
+            if (ctx->queueSize == 1) {
+                pthread_mutex_lock(&ctx->queueMutex);
+                ctx->numThreadsBusy--;
+                pthread_mutex_unlock(&ctx->queueMutex);
+                pthread_cond_signal(&ctx->queuePushCond);
+            }
         }
     }
     /* Unreachable */
@@ -168,6 +177,21 @@ size_t POOL_sizeof(POOL_ctx *ctx) {
         + ctx->numThreads * sizeof(pthread_t);
 }
 
+/**
+ * Returns 1 if the queue is full and 0 otherwise.
+ *
+ * If the queueSize is 1 (the pool was created with an intended queueSize of 0),
+ * then a queue is empty if there is a thread free and no job is waiting.
+ */
+static int isQueueFull(POOL_ctx const* ctx) {
+    if (ctx->queueSize > 1) {
+        return ctx->queueHead == ((ctx->queueTail + 1) % ctx->queueSize);
+    } else {
+        return ctx->numThreadsBusy == ctx->numThreads ||
+               !ctx->queueEmpty;
+    }
+}
+
 void POOL_add(void* ctxVoid, POOL_function function, void *opaque) {
     POOL_ctx* const ctx = (POOL_ctx*)ctxVoid;
     if (!ctx) { return; }
@@ -175,22 +199,15 @@ void POOL_add(void* ctxVoid, POOL_function function, void *opaque) {
     pthread_mutex_lock(&ctx->queueMutex);
     {   POOL_job const job = {function, opaque};
 
-        // Wait until there is space in the queue for the new job.
-        // If the ctx->queueSize is 1 (the pool was created with an
-        // intended queueSize of 0) and there is no job already waiting,
-        // wait until there is a thread free for the new job.
-        size_t newTail = (ctx->queueTail + 1) % ctx->queueSize;
-        while (ctx->queueHead == newTail && !ctx->shutdown &&
-               (ctx->queueSize > 1 || ctx->numThreadsBusy == ctx->numThreads ||
-                !ctx->queueEmpty)) {
+        /* Wait until there is space in the queue for the new job */
+        while (isQueueFull(ctx) && !ctx->shutdown) {
           pthread_cond_wait(&ctx->queuePushCond, &ctx->queueMutex);
-          newTail = (ctx->queueTail + 1) % ctx->queueSize;
         }
         /* The queue is still going => there is space */
         if (!ctx->shutdown) {
             ctx->queueEmpty = 0;
             ctx->queue[ctx->queueTail] = job;
-            ctx->queueTail = newTail;
+            ctx->queueTail = (ctx->queueTail + 1) % ctx->queueSize;
         }
     }
     pthread_mutex_unlock(&ctx->queueMutex);