]> git.ipfire.org Git - thirdparty/zstd.git/commitdiff
use pthread_cond to send signals between threads
authorYann Collet <cyan@fb.com>
Sun, 1 Jan 2017 16:31:33 +0000 (17:31 +0100)
committerYann Collet <cyan@fb.com>
Sun, 1 Jan 2017 16:31:33 +0000 (17:31 +0100)
lib/compress/zstdmt_compress.c

index 770f59758b66a306fb4b802535e1ca22ebc221b6..b9cc81f67183d7650dcf9b07d11d3cab2c9ae0fa 100644 (file)
@@ -115,6 +115,7 @@ typedef struct {
     size_t cSize;
     unsigned jobCompleted;
     pthread_mutex_t* jobCompleted_mutex;
+    pthread_cond_t* jobCompleted_cond;
 } ZSTDMT_jobDescription;
 
 /* ZSTDMT_compressFrame() : POOL_function type */
@@ -126,7 +127,9 @@ void ZSTDMT_compressFrame(void* jobDescription)
     job->cSize = ZSTD_compressCCtx(job->cctx, job->dstBuff.start, job->dstBuff.size, job->srcStart, job->srcSize, job->compressionLevel);
     DEBUGLOG(5, "compressed to %u bytes  ", (unsigned)job->cSize);
     job->jobCompleted = 1;
-    DEBUGLOG(5, "unlocking mutex jobCompleted_mutex");
+    DEBUGLOG(5, "sending jobCompleted signal");
+    pthread_mutex_lock(job->jobCompleted_mutex);
+    pthread_cond_signal(job->jobCompleted_cond);
     pthread_mutex_unlock(job->jobCompleted_mutex);
     DEBUGLOG(5, "ZSTDMT_compressFrame completed");
 }
@@ -188,6 +191,7 @@ struct ZSTDMT_CCtx_s {
     ZSTDMT_CCtxPool* cctxPool;
     unsigned nbThreads;
     pthread_mutex_t jobCompleted_mutex;
+    pthread_cond_t jobCompleted_cond;
     ZSTDMT_jobDescription jobs[1];   /* variable size */
 };
 
@@ -201,6 +205,7 @@ ZSTDMT_CCtx *ZSTDMT_createCCtx(unsigned nbThreads)
     cctx->buffPool = ZSTDMT_createBufferPool(nbThreads);
     cctx->cctxPool = ZSTDMT_createCCtxPool(nbThreads);
     pthread_mutex_init(&cctx->jobCompleted_mutex, NULL);
+    pthread_cond_init(&cctx->jobCompleted_cond, NULL);
     return cctx;
 }
 
@@ -248,6 +253,7 @@ size_t ZSTDMT_compressCCtx(ZSTDMT_CCtx* mtctx,
             mtctx->jobs[u].frameID = u;
             mtctx->jobs[u].jobCompleted = 0;
             mtctx->jobs[u].jobCompleted_mutex = &mtctx->jobCompleted_mutex;
+            mtctx->jobs[u].jobCompleted_cond = &mtctx->jobCompleted_cond;
 
             DEBUGLOG(3, "posting job %u   (%u bytes)", u, (U32)frameSize);
             POOL_add(mtctx->factory, ZSTDMT_compressFrame, &mtctx->jobs[u]);
@@ -261,10 +267,14 @@ size_t ZSTDMT_compressCCtx(ZSTDMT_CCtx* mtctx,
         size_t dstPos = 0;
         for (frameID=0; frameID<nbFrames; frameID++) {
             DEBUGLOG(3, "ready to write frame %u ", frameID);
+
+            pthread_mutex_lock(&mtctx->jobCompleted_mutex);
             while (mtctx->jobs[frameID].jobCompleted==0) {
-                DEBUGLOG(4, "waiting for signal jobCompleted_mutex")
-                pthread_mutex_lock(&mtctx->jobCompleted_mutex);
+                DEBUGLOG(4, "waiting for jobCompleted signal for frame %u", frameID);
+                pthread_cond_wait(&mtctx->jobCompleted_cond, &mtctx->jobCompleted_mutex);
             }
+            pthread_mutex_unlock(&mtctx->jobCompleted_mutex);
+
             {   size_t const cSize = mtctx->jobs[frameID].cSize;
                 if (ZSTD_isError(cSize)) return cSize;
                 if (dstPos + cSize > dstCapacity) return ERROR(dstSize_tooSmall);