]> git.ipfire.org Git - thirdparty/zstd.git/commitdiff
Fix pool for threading.h
authorNick Terrell <terrelln@fb.com>
Sun, 1 Jan 2017 00:10:47 +0000 (19:10 -0500)
committerNick Terrell <terrelln@fb.com>
Sun, 1 Jan 2017 00:10:47 +0000 (19:10 -0500)
lib/common/pool.c
lib/common/pool.h

index 97ca7ddab2d968da18a9804b0e2496f553f66e1d..e24691f7791f5d393f0530758ed72f5753e37103 100644 (file)
@@ -51,21 +51,21 @@ static void* POOL_thread(void* opaque) {
     if (!ctx) { return NULL; }
     for (;;) {
         /* Lock the mutex and wait for a non-empty queue or until shutdown */
-        if (pthread_mutex_lock(&ctx->queueMutex)) { return NULL; }
+        pthread_mutex_lock(&ctx->queueMutex);
         while (ctx->queueHead == ctx->queueTail && !ctx->shutdown) {
-            if (pthread_cond_wait(&ctx->queuePopCond, &ctx->queueMutex)) { return NULL; }
+            pthread_cond_wait(&ctx->queuePopCond, &ctx->queueMutex);
         }
         /* empty => shutting down: so stop */
         if (ctx->queueHead == ctx->queueTail) {
-            if (pthread_mutex_unlock(&ctx->queueMutex)) { return NULL; }
+            pthread_mutex_unlock(&ctx->queueMutex);
             return opaque;
         }
         /* Pop a job off the queue */
         {   POOL_job const job = ctx->queue[ctx->queueHead];
             ctx->queueHead = (ctx->queueHead + 1) % ctx->queueSize;
             /* Unlock the mutex, signal a pusher, and run the job */
-            if (pthread_mutex_unlock(&ctx->queueMutex)) { return NULL; }
-            if (pthread_cond_signal(&ctx->queuePushCond)) { return NULL; }
+            pthread_mutex_unlock(&ctx->queueMutex);
+            pthread_cond_signal(&ctx->queuePushCond);
             job.function(job.opaque);
         }
     }
@@ -73,7 +73,6 @@ static void* POOL_thread(void* opaque) {
 }
 
 POOL_ctx *POOL_create(size_t numThreads, size_t queueSize) {
-    int err = 0;
     POOL_ctx *ctx;
     /* Check the parameters */
     if (!numThreads || !queueSize) { return NULL; }
@@ -88,15 +87,15 @@ POOL_ctx *POOL_create(size_t numThreads, size_t queueSize) {
     ctx->queue = (POOL_job *)malloc(ctx->queueSize * sizeof(POOL_job));
     ctx->queueHead = 0;
     ctx->queueTail = 0;
-    err |= pthread_mutex_init(&ctx->queueMutex, NULL);
-    err |= pthread_cond_init(&ctx->queuePushCond, NULL);
-    err |= pthread_cond_init(&ctx->queuePopCond, NULL);
+    pthread_mutex_init(&ctx->queueMutex, NULL);
+    pthread_cond_init(&ctx->queuePushCond, NULL);
+    pthread_cond_init(&ctx->queuePopCond, NULL);
     ctx->shutdown = 0;
     /* Allocate space for the thread handles */
     ctx->threads = (pthread_t *)malloc(numThreads * sizeof(pthread_t));
     ctx->numThreads = 0;
     /* Check for errors */
-    if (!ctx->threads || !ctx->queue || err) { POOL_free(ctx); return NULL; }
+    if (!ctx->threads || !ctx->queue) { POOL_free(ctx); return NULL; }
     /* Initialize the threads */
     {   size_t i;
         for (i = 0; i < numThreads; ++i) {
index f4afc1ee3d239240cfbd0c28188835abc80a3b60..c26f543fcbd8f236faa6404cbcc1dce4191806d0 100644 (file)
@@ -39,6 +39,7 @@ typedef void (*POOL_add_function)(void *, POOL_function, void *);
 /*! POOL_add() :
     Add the job `function(opaque)` to the thread pool.
     Possibly blocks until there is room in the queue.
+    Note : The function may be executed asynchronously, so `opaque` must live until the function has been completed.
 */
 void POOL_add(void *ctx, POOL_function function, void *opaque);