]> git.ipfire.org Git - thirdparty/zstd.git/commitdiff
Replace marker with queueEmpty variable and update pool.h comment
authorStella Lau <laus@fb.com>
Tue, 1 Aug 2017 19:24:55 +0000 (12:24 -0700)
committerStella Lau <laus@fb.com>
Tue, 1 Aug 2017 19:30:16 +0000 (12:30 -0700)
lib/common/pool.c
lib/common/pool.h

index f51beccc59c5d9240175173de9718d6d9510bc0d..e25b1d75e82c20092594a9c1c88bcd37f1a87f49 100644 (file)
@@ -13,8 +13,6 @@
 #include <stdlib.h>  /* malloc, calloc, free */
 #include "pool.h"
 
-#include <stdio.h>
-
 /* ======   Compiler specifics   ====== */
 #if defined(_MSC_VER)
 #  pragma warning(disable : 4204)        /* disable: C4204: non-constant aggregate initializer */
@@ -36,17 +34,16 @@ struct POOL_ctx_s {
     pthread_t *threads;
     size_t numThreads;
 
-    size_t numThreadsBusy;
-
     /* The queue is a circular buffer */
     POOL_job *queue;
     size_t queueHead;
     size_t queueTail;
     size_t queueSize;
 
-    size_t jobsQueued;
-
-    size_t marker;
+    /* The number of threads working on jobs */
+    size_t numThreadsBusy;
+    /* Indicates if the queue is empty */
+    int queueEmpty;
 
     /* The mutex protects the queue */
     pthread_mutex_t queueMutex;
@@ -69,12 +66,11 @@ static void* POOL_thread(void* opaque) {
     for (;;) {
         /* Lock the mutex and wait for a non-empty queue or until shutdown */
         pthread_mutex_lock(&ctx->queueMutex);
-//        while (ctx->queueHead == ctx->queueTail && !ctx->shutdown) {
-        while (!ctx->jobsQueued && !ctx->shutdown && !ctx->marker) {
+        while (ctx->queueEmpty && !ctx->shutdown) {
             pthread_cond_wait(&ctx->queuePopCond, &ctx->queueMutex);
         }
         /* empty => shutting down: so stop */
-        if (!ctx->jobsQueued && !ctx->marker) {
+        if (ctx->queueEmpty) {
             pthread_mutex_unlock(&ctx->queueMutex);
             return opaque;
         }
@@ -82,9 +78,8 @@ static void* POOL_thread(void* opaque) {
         {
             POOL_job const job = ctx->queue[ctx->queueHead];
             ctx->queueHead = (ctx->queueHead + 1) % ctx->queueSize;
-            ctx->jobsQueued--;
             ctx->numThreadsBusy++;
-            ctx->marker = 0;
+            ctx->queueEmpty = ctx->queueHead == ctx->queueTail;
             /* Unlock the mutex, signal a pusher, and run the job */
             pthread_mutex_unlock(&ctx->queueMutex);
             pthread_cond_signal(&ctx->queuePushCond);
@@ -114,8 +109,7 @@ POOL_ctx *POOL_create(size_t numThreads, size_t queueSize) {
     ctx->queueHead = 0;
     ctx->queueTail = 0;
     ctx->numThreadsBusy = 0;
-    ctx->jobsQueued = 0;
-    ctx->marker = 0;
+    ctx->queueEmpty = 1;
     (void)pthread_mutex_init(&ctx->queueMutex, NULL);
     (void)pthread_cond_init(&ctx->queuePushCond, NULL);
     (void)pthread_cond_init(&ctx->queuePopCond, NULL);
@@ -180,22 +174,23 @@ void POOL_add(void* ctxVoid, POOL_function function, void *opaque) {
 
     pthread_mutex_lock(&ctx->queueMutex);
     {   POOL_job const job = {function, opaque};
-        /* Wait until there is space in the queue for the new job */
+
+        // Wait until there is space in the queue for the new job.
+        // If the ctx->queueSize is 1 (the pool was created with an
+        // intended queueSize of 0) and there is no job already waiting,
+        // wait until there is a thread free for the new job.
         size_t newTail = (ctx->queueTail + 1) % ctx->queueSize;
         while (ctx->queueHead == newTail && !ctx->shutdown &&
                (ctx->queueSize > 1 || ctx->numThreadsBusy == ctx->numThreads ||
-                ctx->marker)) {
+                !ctx->queueEmpty)) {
           pthread_cond_wait(&ctx->queuePushCond, &ctx->queueMutex);
           newTail = (ctx->queueTail + 1) % ctx->queueSize;
         }
         /* The queue is still going => there is space */
         if (!ctx->shutdown) {
-            if (ctx->queueSize > 1) {
-              ctx->marker = 1;
-            }
+            ctx->queueEmpty = 0;
             ctx->queue[ctx->queueTail] = job;
             ctx->queueTail = newTail;
-            ctx->jobsQueued++;
         }
     }
     pthread_mutex_unlock(&ctx->queueMutex);
index 957100f4625f597aba78bf7579e8e5f5cceacee1..ed271195077d885633c3c19f41a9a0b2ffaecf3e 100644 (file)
@@ -22,7 +22,6 @@ typedef struct POOL_ctx_s POOL_ctx;
  *  Create a thread pool with at most `numThreads` threads.
  * `numThreads` must be at least 1.
  *  The maximum number of queued jobs before blocking is `queueSize`.
- * `queueSize` must be at least 1.
  * @return : POOL_ctx pointer on success, else NULL.
 */
 POOL_ctx *POOL_create(size_t numThreads, size_t queueSize);