]> git.ipfire.org Git - thirdparty/zstd.git/commitdiff
zstdmt: there is now one mutex/cond per job
authorYann Collet <cyan@fb.com>
Sat, 27 Jan 2018 01:48:33 +0000 (17:48 -0800)
committerYann Collet <cyan@fb.com>
Sat, 27 Jan 2018 01:55:08 +0000 (17:55 -0800)
lib/compress/zstdmt_compress.c
tests/zstreamtest.c

index e0486450751a1c21585c15b9154dbca69cfca81a..22560bfb1d16d5f04578251f59d9cc959e3a8b96 100644 (file)
@@ -310,8 +310,8 @@ static void ZSTDMT_releaseCCtx(ZSTDMT_CCtxPool* pool, ZSTD_CCtx* cctx)
 typedef struct {
     size_t   consumed;                   /* SHARED - set0 by mtctx, then modified by worker AND read by mtctx */
     size_t   cSize;                      /* SHARED - set0 by mtctx, then modified by worker AND read by mtctx, then set0 by mtctx */
-    ZSTD_pthread_mutex_t* mtctx_mutex;   /* Thread-safe - used by mtctx and (all) workers */
-    ZSTD_pthread_cond_t* mtctx_cond;     /* Thread-safe - used by mtctx and (all) workers */
+    ZSTD_pthread_mutex_t job_mutex;      /* Thread-safe - used by mtctx and worker */
+    ZSTD_pthread_cond_t job_cond;        /* Thread-safe - used by mtctx and worker */
     ZSTDMT_CCtxPool* cctxPool;           /* Thread-safe - used by mtctx and (all) workers */
     ZSTDMT_bufferPool* bufPool;          /* Thread-safe - used by mtctx and (all) workers */
     buffer_t dstBuff;                    /* set by worker (or mtctx), then read by worker & mtctx, then modified by mtctx => no barrier */
@@ -395,13 +395,13 @@ void ZSTDMT_compressChunk(void* jobDescription)
             ip += blockSize;
             op += cSize; assert(op < oend);
             /* stats */
-            ZSTD_PTHREAD_MUTEX_LOCK(job->mtctx_mutex);   /* note : it's a mtctx mutex */
+            ZSTD_PTHREAD_MUTEX_LOCK(&job->job_mutex);   /* note : it's a mtctx mutex */
             job->cSize += cSize;
             job->consumed = blockSize * blockNb;
             DEBUGLOG(5, "ZSTDMT_compressChunk: compress new block : cSize==%u bytes (total: %u)",
                         (U32)cSize, (U32)job->cSize);
-            ZSTD_pthread_cond_signal(job->mtctx_cond);   /* warns some more data is ready to be flushed */
-            ZSTD_pthread_mutex_unlock(job->mtctx_mutex);
+            ZSTD_pthread_cond_signal(&job->job_cond);   /* warns some more data is ready to be flushed */
+            ZSTD_pthread_mutex_unlock(&job->job_mutex);
         }
         /* last block */
         assert(blockSize > 0); assert((blockSize & (blockSize - 1)) == 0);  /* blockSize must be power of 2 for mask==(blockSize-1) to work */
@@ -413,9 +413,9 @@ void ZSTDMT_compressChunk(void* jobDescription)
                  ZSTD_compressContinue(cctx, op, oend-op, ip, lastBlockSize);
             if (ZSTD_isError(cSize)) { job->cSize = cSize; goto _endJob; }
             /* stats */
-            ZSTD_PTHREAD_MUTEX_LOCK(job->mtctx_mutex);
+            ZSTD_PTHREAD_MUTEX_LOCK(&job->job_mutex);
             job->cSize += cSize;
-            ZSTD_pthread_mutex_unlock(job->mtctx_mutex);
+            ZSTD_pthread_mutex_unlock(&job->job_mutex);
     }   }
 
 _endJob:
@@ -424,10 +424,10 @@ _endJob:
     ZSTDMT_releaseBuffer(job->bufPool, job->srcBuff);
     job->srcBuff = g_nullBuffer; job->prefixStart = NULL;
     /* report */
-    ZSTD_PTHREAD_MUTEX_LOCK(job->mtctx_mutex);
+    ZSTD_PTHREAD_MUTEX_LOCK(&job->job_mutex);
     job->consumed = job->srcSize;
-    ZSTD_pthread_cond_signal(job->mtctx_cond);
-    ZSTD_pthread_mutex_unlock(job->mtctx_mutex);
+    ZSTD_pthread_cond_signal(&job->job_cond);
+    ZSTD_pthread_mutex_unlock(&job->job_mutex);
 }
 
 
@@ -447,8 +447,6 @@ struct ZSTDMT_CCtx_s {
     ZSTDMT_jobDescription* jobs;
     ZSTDMT_bufferPool* bufPool;
     ZSTDMT_CCtxPool* cctxPool;
-    ZSTD_pthread_mutex_t mtctx_mutex;
-    ZSTD_pthread_cond_t mtctx_cond;
     ZSTD_CCtx_params params;
     size_t targetSectionSize;
     size_t targetPrefixSize;
@@ -470,16 +468,34 @@ struct ZSTDMT_CCtx_s {
 };
 
 /* ZSTDMT_allocJobsTable()
- * allocate, and just init to zero, a job table.
+ * allocate and init a job table.
  * update *nbJobsPtr to next power of 2 value, as size of table
  * No reverse free() function is provided : just use ZSTD_free() */
-static ZSTDMT_jobDescription* ZSTDMT_allocJobsTable(U32* nbJobsPtr, ZSTD_customMem cMem)
+static ZSTDMT_jobDescription* ZSTDMT_createJobsTable(U32* nbJobsPtr, ZSTD_customMem cMem)
 {
     U32 const nbJobsLog2 = ZSTD_highbit32(*nbJobsPtr) + 1;
     U32 const nbJobs = 1 << nbJobsLog2;
-    *nbJobsPtr = nbJobs;
-    return (ZSTDMT_jobDescription*) ZSTD_calloc(
+    ZSTDMT_jobDescription* const jobTable = ZSTD_calloc(
                             nbJobs * sizeof(ZSTDMT_jobDescription), cMem);
+    U32 jobNb;
+    if (jobTable==NULL) return NULL;
+    *nbJobsPtr = nbJobs;
+    for (jobNb=0; jobNb<nbJobs; jobNb++) {
+        ZSTD_pthread_mutex_init(&jobTable[jobNb].job_mutex, NULL);   /* <======== should check init result */
+        ZSTD_pthread_cond_init(&jobTable[jobNb].job_cond, NULL);  /* <======== should check init result */
+    }
+    return jobTable;
+}
+
+static void ZSTDMT_freeJobsTable(ZSTDMT_jobDescription* jobTable, U32 nbJobs, ZSTD_customMem cMem)
+{
+    U32 jobNb;
+    if (jobTable == NULL) return;
+    for (jobNb=0; jobNb<nbJobs; jobNb++) {
+        ZSTD_pthread_mutex_destroy(&jobTable[jobNb].job_mutex);
+        ZSTD_pthread_cond_destroy(&jobTable[jobNb].job_cond);
+    }
+    ZSTD_free(jobTable, cMem);
 }
 
 /* ZSTDMT_CCtxParam_setNbThreads():
@@ -512,7 +528,7 @@ ZSTDMT_CCtx* ZSTDMT_createCCtx_advanced(unsigned nbThreads, ZSTD_customMem cMem)
     mtctx->cMem = cMem;
     mtctx->allJobsCompleted = 1;
     mtctx->factory = POOL_create_advanced(nbThreads, 0, cMem);
-    mtctx->jobs = ZSTDMT_allocJobsTable(&nbJobs, cMem);
+    mtctx->jobs = ZSTDMT_createJobsTable(&nbJobs, cMem);
     assert(nbJobs > 0); assert((nbJobs & (nbJobs - 1)) == 0);  /* ensure nbJobs is a power of 2 */
     mtctx->jobIDMask = nbJobs - 1;
     mtctx->bufPool = ZSTDMT_createBufferPool(nbThreads, cMem);
@@ -521,14 +537,6 @@ ZSTDMT_CCtx* ZSTDMT_createCCtx_advanced(unsigned nbThreads, ZSTD_customMem cMem)
         ZSTDMT_freeCCtx(mtctx);
         return NULL;
     }
-    if (ZSTD_pthread_mutex_init(&mtctx->mtctx_mutex, NULL)) {
-        ZSTDMT_freeCCtx(mtctx);
-        return NULL;
-    }
-    if (ZSTD_pthread_cond_init(&mtctx->mtctx_cond, NULL)) {
-        ZSTDMT_freeCCtx(mtctx);
-        return NULL;
-    }
     DEBUGLOG(3, "mt_cctx created, for %u threads", nbThreads);
     return mtctx;
 }
@@ -566,12 +574,12 @@ static void ZSTDMT_waitForAllJobsCompleted(ZSTDMT_CCtx* mtctx)
     DEBUGLOG(4, "ZSTDMT_waitForAllJobsCompleted");
     while (mtctx->doneJobID < mtctx->nextJobID) {
         unsigned const jobID = mtctx->doneJobID & mtctx->jobIDMask;
-        ZSTD_PTHREAD_MUTEX_LOCK(&mtctx->mtctx_mutex);
+        ZSTD_PTHREAD_MUTEX_LOCK(&mtctx->jobs[jobID].job_mutex);
         while (mtctx->jobs[jobID].consumed < mtctx->jobs[jobID].srcSize) {
             DEBUGLOG(5, "waiting for jobCompleted signal from chunk %u", mtctx->doneJobID);   /* we want to block when waiting for data to flush */
-            ZSTD_pthread_cond_wait(&mtctx->mtctx_cond, &mtctx->mtctx_mutex);
+            ZSTD_pthread_cond_wait(&mtctx->jobs[jobID].job_cond, &mtctx->jobs[jobID].job_mutex);
         }
-        ZSTD_pthread_mutex_unlock(&mtctx->mtctx_mutex);
+        ZSTD_pthread_mutex_unlock(&mtctx->jobs[jobID].job_mutex);
         mtctx->doneJobID++;
     }
 }
@@ -581,12 +589,10 @@ size_t ZSTDMT_freeCCtx(ZSTDMT_CCtx* mtctx)
     if (mtctx==NULL) return 0;   /* compatible with free on NULL */
     POOL_free(mtctx->factory);   /* stop and free worker threads */
     ZSTDMT_releaseAllJobResources(mtctx);  /* release job resources into pools first */
-    ZSTD_free(mtctx->jobs, mtctx->cMem);
+    ZSTDMT_freeJobsTable(mtctx->jobs, mtctx->jobIDMask+1, mtctx->cMem);
     ZSTDMT_freeBufferPool(mtctx->bufPool);
     ZSTDMT_freeCCtxPool(mtctx->cctxPool);
     ZSTD_freeCDict(mtctx->cdictLocal);
-    ZSTD_pthread_mutex_destroy(&mtctx->mtctx_mutex);
-    ZSTD_pthread_cond_destroy(&mtctx->mtctx_cond);
     ZSTD_free(mtctx, mtctx->cMem);
     return 0;
 }
@@ -671,7 +677,6 @@ ZSTD_frameProgression ZSTDMT_getFrameProgression(ZSTDMT_CCtx* mtctx)
 {
     ZSTD_frameProgression fps;
     DEBUGLOG(6, "ZSTDMT_getFrameProgression");
-    ZSTD_PTHREAD_MUTEX_LOCK(&mtctx->mtctx_mutex);
     fps.consumed = mtctx->consumed;
     fps.produced = mtctx->produced;
     assert(mtctx->inBuff.filled >= mtctx->inBuff.prefixSize);
@@ -682,14 +687,16 @@ ZSTD_frameProgression ZSTDMT_getFrameProgression(ZSTDMT_CCtx* mtctx)
                     mtctx->doneJobID, lastJobNb, mtctx->jobReady)
         for (jobNb = mtctx->doneJobID ; jobNb < lastJobNb ; jobNb++) {
             unsigned const wJobID = jobNb & mtctx->jobIDMask;
-            size_t const cResult = mtctx->jobs[wJobID].cSize;
-            size_t const produced = ZSTD_isError(cResult) ? 0 : cResult;
-            fps.consumed += mtctx->jobs[wJobID].consumed;
-            fps.ingested += mtctx->jobs[wJobID].srcSize;
-            fps.produced += produced;
+            ZSTD_pthread_mutex_lock(&mtctx->jobs[wJobID].job_mutex);
+            {   size_t const cResult = mtctx->jobs[wJobID].cSize;
+                size_t const produced = ZSTD_isError(cResult) ? 0 : cResult;
+                fps.consumed += mtctx->jobs[wJobID].consumed;
+                fps.ingested += mtctx->jobs[wJobID].srcSize;
+                fps.produced += produced;
+            }
+            ZSTD_pthread_mutex_unlock(&mtctx->jobs[wJobID].job_mutex);
         }
     }
-    ZSTD_pthread_mutex_unlock(&mtctx->mtctx_mutex);
     return fps;
 }
 
@@ -749,9 +756,9 @@ static size_t ZSTDMT_compress_advanced_internal(
 
     if (nbChunks > mtctx->jobIDMask+1) {  /* enlarge job table */
         U32 nbJobs = nbChunks;
-        ZSTD_free(mtctx->jobs, mtctx->cMem);
+        ZSTDMT_freeJobsTable(mtctx->jobs, mtctx->jobIDMask+1, mtctx->cMem);
         mtctx->jobIDMask = 0;
-        mtctx->jobs = ZSTDMT_allocJobsTable(&nbJobs, mtctx->cMem);
+        mtctx->jobs = ZSTDMT_createJobsTable(&nbJobs, mtctx->cMem);
         if (mtctx->jobs==NULL) return ERROR(memory_allocation);
         assert((nbJobs != 0) && ((nbJobs & (nbJobs - 1)) == 0));  /* ensure nbJobs is a power of 2 */
         mtctx->jobIDMask = nbJobs - 1;
@@ -781,8 +788,6 @@ static size_t ZSTDMT_compress_advanced_internal(
             mtctx->jobs[u].bufPool = mtctx->bufPool;
             mtctx->jobs[u].firstChunk = (u==0);
             mtctx->jobs[u].lastChunk = (u==nbChunks-1);
-            mtctx->jobs[u].mtctx_mutex = &mtctx->mtctx_mutex;
-            mtctx->jobs[u].mtctx_cond = &mtctx->mtctx_cond;
 
             if (params.fParams.checksumFlag) {
                 XXH64_update(&xxh64, srcStart + frameStartPos, chunkSize);
@@ -802,12 +807,12 @@ static size_t ZSTDMT_compress_advanced_internal(
         unsigned chunkID;
         for (chunkID=0; chunkID<nbChunks; chunkID++) {
             DEBUGLOG(5, "waiting for chunk %u ", chunkID);
-            ZSTD_PTHREAD_MUTEX_LOCK(&mtctx->mtctx_mutex);
+            ZSTD_PTHREAD_MUTEX_LOCK(&mtctx->jobs[chunkID].job_mutex);
             while (mtctx->jobs[chunkID].consumed < mtctx->jobs[chunkID].srcSize) {
                 DEBUGLOG(5, "waiting for jobCompleted signal from chunk %u", chunkID);
-                ZSTD_pthread_cond_wait(&mtctx->mtctx_cond, &mtctx->mtctx_mutex);
+                ZSTD_pthread_cond_wait(&mtctx->jobs[chunkID].job_cond, &mtctx->jobs[chunkID].job_mutex);
             }
-            ZSTD_pthread_mutex_unlock(&mtctx->mtctx_mutex);
+            ZSTD_pthread_mutex_unlock(&mtctx->jobs[chunkID].job_mutex);
             DEBUGLOG(5, "ready to write chunk %u ", chunkID);
 
             mtctx->jobs[chunkID].prefixStart = NULL;
@@ -1058,8 +1063,6 @@ static size_t ZSTDMT_createCompressionJob(ZSTDMT_CCtx* mtctx, size_t srcSize, ZS
         mtctx->jobs[jobID].lastChunk = endFrame;
         mtctx->jobs[jobID].frameChecksumNeeded = endFrame && (mtctx->nextJobID>0) && mtctx->params.fParams.checksumFlag;
         mtctx->jobs[jobID].dstFlushed = 0;
-        mtctx->jobs[jobID].mtctx_mutex = &mtctx->mtctx_mutex;
-        mtctx->jobs[jobID].mtctx_cond = &mtctx->mtctx_cond;
 
         if (mtctx->params.fParams.checksumFlag)
             XXH64_update(&mtctx->xxhState, (const char*)mtctx->inBuff.buffer.start + mtctx->inBuff.prefixSize, srcSize);
@@ -1128,7 +1131,7 @@ static size_t ZSTDMT_flushProduced(ZSTDMT_CCtx* mtctx, ZSTD_outBuffer* output, u
                 blockToFlush, mtctx->doneJobID, mtctx->nextJobID);
     assert(output->size >= output->pos);
 
-    ZSTD_PTHREAD_MUTEX_LOCK(&mtctx->mtctx_mutex);
+    ZSTD_PTHREAD_MUTEX_LOCK(&mtctx->jobs[wJobID].job_mutex);
     if (  blockToFlush
       && (mtctx->doneJobID < mtctx->nextJobID) ) {
         assert(mtctx->jobs[wJobID].dstFlushed <= mtctx->jobs[wJobID].cSize);
@@ -1140,14 +1143,14 @@ static size_t ZSTDMT_flushProduced(ZSTDMT_CCtx* mtctx, ZSTD_outBuffer* output, u
             }
             DEBUGLOG(5, "waiting for something to flush from job %u (currently flushed: %u bytes)",
                         mtctx->doneJobID, (U32)mtctx->jobs[wJobID].dstFlushed);
-            ZSTD_pthread_cond_wait(&mtctx->mtctx_cond, &mtctx->mtctx_mutex);  /* block when nothing available to flush but more to come */
+            ZSTD_pthread_cond_wait(&mtctx->jobs[wJobID].job_cond, &mtctx->jobs[wJobID].job_mutex);  /* block when nothing to flush but some to come */
     }   }
 
     /* try to flush something */
     {   size_t cSize = mtctx->jobs[wJobID].cSize;                  /* shared */
         size_t const srcConsumed = mtctx->jobs[wJobID].consumed;   /* shared */
         size_t const srcSize = mtctx->jobs[wJobID].srcSize;        /* read-only, could be done after mutex lock, but no-declaration-after-statement */
-        ZSTD_pthread_mutex_unlock(&mtctx->mtctx_mutex);
+        ZSTD_pthread_mutex_unlock(&mtctx->jobs[wJobID].job_mutex);
         if (ZSTD_isError(cSize)) {
             DEBUGLOG(5, "ZSTDMT_flushProduced: job %u : compression error detected : %s",
                         mtctx->doneJobID, ZSTD_getErrorName(cSize));
index 16dcba73a6342b82c9fc0b3abcbcb45857568dc5..ecc477c715b18041f0aec810443dee2882a2b627 100644 (file)
@@ -969,9 +969,9 @@ static int fuzzerTests(U32 seed, U32 nbTests, unsigned startTest, double compres
         FUZ_rand(&coreSeed);
         lseed = coreSeed ^ prime32;
         if (nbTests >= testNb) {
-            DISPLAYUPDATE(2, "\r%6u/%6u (%08X)   ", testNb, nbTests, lseed);
+            DISPLAYUPDATE(2, "\r%6u/%6u    ", testNb, nbTests);
         } else {
-            DISPLAYUPDATE(2, "\r%6u  (%08X)        ", testNb, lseed);
+            DISPLAYUPDATE(2, "\r%6u        ", testNb);
         }
 
         /* states full reset (deliberately not synchronized) */