]> git.ipfire.org Git - thirdparty/zstd.git/commitdiff
zstdmt:: renamed mutex and cond to underline they are context-global
authorYann Collet <cyan@fb.com>
Thu, 25 Jan 2018 22:52:34 +0000 (14:52 -0800)
committerYann Collet <cyan@fb.com>
Thu, 25 Jan 2018 22:52:34 +0000 (14:52 -0800)
lib/compress/zstdmt_compress.c

index 0207920dceb8c99d31939844a8595f8e8fe1ccd3..bc9209d5b6627374d33eb858d31a53b3dcc7d8cb 100644 (file)
@@ -316,8 +316,8 @@ typedef struct {
     unsigned lastChunk;
     unsigned jobCompleted;
     unsigned frameChecksumNeeded;
-    ZSTD_pthread_mutex_t* jobCompleted_mutex;
-    ZSTD_pthread_cond_t* jobCompleted_cond;
+    ZSTD_pthread_mutex_t* mtctx_mutex;
+    ZSTD_pthread_cond_t* mtctx_cond;
     ZSTD_CCtx_params params;
     const ZSTD_CDict* cdict;
     ZSTDMT_CCtxPool* cctxPool;
@@ -344,9 +344,9 @@ void ZSTDMT_compressChunk(void* jobDescription)
             job->cSize = ERROR(memory_allocation);
             goto _endJob;
         }
-        ZSTD_PTHREAD_MUTEX_LOCK(job->jobCompleted_mutex);   /* note : it's a mtctx mutex */
+        ZSTD_PTHREAD_MUTEX_LOCK(job->mtctx_mutex);
         job->dstBuff = dstBuff;
-        ZSTD_pthread_mutex_unlock(job->jobCompleted_mutex);
+        ZSTD_pthread_mutex_unlock(job->mtctx_mutex);
     }
 
     /* init */
@@ -399,13 +399,13 @@ void ZSTDMT_compressChunk(void* jobDescription)
             ip += ZSTD_BLOCKSIZE_MAX;
             op += cSize; assert(op < oend);
             /* stats */
-            ZSTD_PTHREAD_MUTEX_LOCK(job->jobCompleted_mutex);   /* note : it's a mtctx mutex */
+            ZSTD_PTHREAD_MUTEX_LOCK(job->mtctx_mutex);   /* note : it's a mtctx mutex */
             job->cSize += cSize;
             job->consumed = ZSTD_BLOCKSIZE_MAX * blockNb;
             DEBUGLOG(5, "ZSTDMT_compressChunk: compress new block : cSize==%u bytes (total: %u)",
                         (U32)cSize, (U32)job->cSize);
-            ZSTD_pthread_cond_signal(job->jobCompleted_cond);
-            ZSTD_pthread_mutex_unlock(job->jobCompleted_mutex);
+            ZSTD_pthread_cond_signal(job->mtctx_cond);
+            ZSTD_pthread_mutex_unlock(job->mtctx_mutex);
         }
         /* last block */
         if ((nbBlocks > 0) | job->lastChunk /*must output a "last block" flag*/ ) {
@@ -416,10 +416,10 @@ void ZSTDMT_compressChunk(void* jobDescription)
                  ZSTD_compressContinue(cctx, op, oend-op, ip, lastBlockSize);
             if (ZSTD_isError(cSize)) { job->cSize = cSize; goto _endJob; }
             /* stats */
-            ZSTD_PTHREAD_MUTEX_LOCK(job->jobCompleted_mutex);   /* note : it's a mtctx mutex */
+            ZSTD_PTHREAD_MUTEX_LOCK(job->mtctx_mutex);   /* note : it's a mtctx mutex */
             job->cSize += cSize;
             job->consumed = job->srcSize;
-            ZSTD_pthread_mutex_unlock(job->jobCompleted_mutex);
+            ZSTD_pthread_mutex_unlock(job->mtctx_mutex);
         }
     }
 #endif
@@ -430,11 +430,11 @@ _endJob:
     ZSTDMT_releaseBuffer(job->bufPool, job->src);
     job->src = g_nullBuffer; job->srcStart = NULL;
     /* report */
-    ZSTD_PTHREAD_MUTEX_LOCK(job->jobCompleted_mutex);
+    ZSTD_PTHREAD_MUTEX_LOCK(job->mtctx_mutex);
     job->consumed = job->srcSize;
     job->jobCompleted = 1;
-    ZSTD_pthread_cond_signal(job->jobCompleted_cond);
-    ZSTD_pthread_mutex_unlock(job->jobCompleted_mutex);
+    ZSTD_pthread_cond_signal(job->mtctx_cond);
+    ZSTD_pthread_mutex_unlock(job->mtctx_mutex);
 }
 
 
@@ -452,8 +452,8 @@ struct ZSTDMT_CCtx_s {
     ZSTDMT_jobDescription* jobs;
     ZSTDMT_bufferPool* bufPool;
     ZSTDMT_CCtxPool* cctxPool;
-    ZSTD_pthread_mutex_t jobCompleted_mutex;
-    ZSTD_pthread_cond_t jobCompleted_cond;
+    ZSTD_pthread_mutex_t mtctx_mutex;
+    ZSTD_pthread_cond_t mtctx_cond;
     ZSTD_CCtx_params params;
     size_t targetSectionSize;
     size_t inBuffSize;
@@ -538,11 +538,11 @@ ZSTDMT_CCtx* ZSTDMT_createCCtx_advanced(unsigned nbThreads, ZSTD_customMem cMem)
         ZSTDMT_freeCCtx(mtctx);
         return NULL;
     }
-    if (ZSTD_pthread_mutex_init(&mtctx->jobCompleted_mutex, NULL)) {
+    if (ZSTD_pthread_mutex_init(&mtctx->mtctx_mutex, NULL)) {
         ZSTDMT_freeCCtx(mtctx);
         return NULL;
     }
-    if (ZSTD_pthread_cond_init(&mtctx->jobCompleted_cond, NULL)) {
+    if (ZSTD_pthread_cond_init(&mtctx->mtctx_cond, NULL)) {
         ZSTDMT_freeCCtx(mtctx);
         return NULL;
     }
@@ -582,12 +582,12 @@ static void ZSTDMT_waitForAllJobsCompleted(ZSTDMT_CCtx* zcs)
     DEBUGLOG(4, "ZSTDMT_waitForAllJobsCompleted");
     while (zcs->doneJobID < zcs->nextJobID) {
         unsigned const jobID = zcs->doneJobID & zcs->jobIDMask;
-        ZSTD_PTHREAD_MUTEX_LOCK(&zcs->jobCompleted_mutex);
+        ZSTD_PTHREAD_MUTEX_LOCK(&zcs->mtctx_mutex);
         while (zcs->jobs[jobID].jobCompleted==0) {
             DEBUGLOG(5, "waiting for jobCompleted signal from chunk %u", zcs->doneJobID);   /* we want to block when waiting for data to flush */
-            ZSTD_pthread_cond_wait(&zcs->jobCompleted_cond, &zcs->jobCompleted_mutex);
+            ZSTD_pthread_cond_wait(&zcs->mtctx_cond, &zcs->mtctx_mutex);
         }
-        ZSTD_pthread_mutex_unlock(&zcs->jobCompleted_mutex);
+        ZSTD_pthread_mutex_unlock(&zcs->mtctx_mutex);
         zcs->doneJobID++;
     }
 }
@@ -601,8 +601,8 @@ size_t ZSTDMT_freeCCtx(ZSTDMT_CCtx* mtctx)
     ZSTDMT_freeBufferPool(mtctx->bufPool);
     ZSTDMT_freeCCtxPool(mtctx->cctxPool);
     ZSTD_freeCDict(mtctx->cdictLocal);
-    ZSTD_pthread_mutex_destroy(&mtctx->jobCompleted_mutex);
-    ZSTD_pthread_cond_destroy(&mtctx->jobCompleted_cond);
+    ZSTD_pthread_mutex_destroy(&mtctx->mtctx_mutex);
+    ZSTD_pthread_cond_destroy(&mtctx->mtctx_cond);
     ZSTD_free(mtctx, mtctx->cMem);
     return 0;
 }
@@ -672,7 +672,7 @@ ZSTD_frameProgression ZSTDMT_getFrameProgression(ZSTDMT_CCtx* mtctx)
 {
     ZSTD_frameProgression fs;
     DEBUGLOG(6, "ZSTDMT_getFrameProgression");
-    ZSTD_PTHREAD_MUTEX_LOCK(&mtctx->jobCompleted_mutex);
+    ZSTD_PTHREAD_MUTEX_LOCK(&mtctx->mtctx_mutex);
     fs.consumed = mtctx->consumed;
     fs.produced = mtctx->produced;
     assert(mtctx->inBuff.filled >= mtctx->prefixSize);
@@ -690,7 +690,7 @@ ZSTD_frameProgression ZSTDMT_getFrameProgression(ZSTDMT_CCtx* mtctx)
             fs.produced += produced;
         }
     }
-    ZSTD_pthread_mutex_unlock(&mtctx->jobCompleted_mutex);
+    ZSTD_pthread_mutex_unlock(&mtctx->mtctx_mutex);
     return fs;
 }
 
@@ -783,8 +783,8 @@ static size_t ZSTDMT_compress_advanced_internal(
             mtctx->jobs[u].firstChunk = (u==0);
             mtctx->jobs[u].lastChunk = (u==nbChunks-1);
             mtctx->jobs[u].jobCompleted = 0;
-            mtctx->jobs[u].jobCompleted_mutex = &mtctx->jobCompleted_mutex;
-            mtctx->jobs[u].jobCompleted_cond = &mtctx->jobCompleted_cond;
+            mtctx->jobs[u].mtctx_mutex = &mtctx->mtctx_mutex;
+            mtctx->jobs[u].mtctx_cond = &mtctx->mtctx_cond;
 
             if (params.fParams.checksumFlag) {
                 XXH64_update(&xxh64, srcStart + frameStartPos, chunkSize);
@@ -804,12 +804,12 @@ static size_t ZSTDMT_compress_advanced_internal(
         unsigned chunkID;
         for (chunkID=0; chunkID<nbChunks; chunkID++) {
             DEBUGLOG(5, "waiting for chunk %u ", chunkID);
-            ZSTD_PTHREAD_MUTEX_LOCK(&mtctx->jobCompleted_mutex);
+            ZSTD_PTHREAD_MUTEX_LOCK(&mtctx->mtctx_mutex);
             while (mtctx->jobs[chunkID].jobCompleted==0) {
                 DEBUGLOG(5, "waiting for jobCompleted signal from chunk %u", chunkID);
-                ZSTD_pthread_cond_wait(&mtctx->jobCompleted_cond, &mtctx->jobCompleted_mutex);
+                ZSTD_pthread_cond_wait(&mtctx->mtctx_cond, &mtctx->mtctx_mutex);
             }
-            ZSTD_pthread_mutex_unlock(&mtctx->jobCompleted_mutex);
+            ZSTD_pthread_mutex_unlock(&mtctx->mtctx_mutex);
             DEBUGLOG(5, "ready to write chunk %u ", chunkID);
 
             mtctx->jobs[chunkID].srcStart = NULL;
@@ -1035,8 +1035,8 @@ static size_t ZSTDMT_createCompressionJob(ZSTDMT_CCtx* zcs, size_t srcSize, ZSTD
         zcs->jobs[jobID].jobCompleted = 0;
         zcs->jobs[jobID].frameChecksumNeeded = endFrame && (zcs->nextJobID>0) && zcs->params.fParams.checksumFlag;
         zcs->jobs[jobID].dstFlushed = 0;
-        zcs->jobs[jobID].jobCompleted_mutex = &zcs->jobCompleted_mutex;
-        zcs->jobs[jobID].jobCompleted_cond = &zcs->jobCompleted_cond;
+        zcs->jobs[jobID].mtctx_mutex = &zcs->mtctx_mutex;
+        zcs->jobs[jobID].mtctx_cond = &zcs->mtctx_cond;
 
         if (zcs->params.fParams.checksumFlag)
             XXH64_update(&zcs->xxhState, (const char*)zcs->inBuff.buffer.start + zcs->prefixSize, srcSize);
@@ -1067,7 +1067,7 @@ static size_t ZSTDMT_createCompressionJob(ZSTDMT_CCtx* zcs, size_t srcSize, ZSTD
                 zcs->params.fParams.checksumFlag = 0;
     }   }   }
 
-    DEBUGLOG(5, "ZSTDMT_createCompressionJob: posting job %u : %u bytes  (end:%u) (note : doneJob = %u=>%u)",
+    DEBUGLOG(2, "ZSTDMT_createCompressionJob: posting job %u : %u bytes  (end:%u) (note : doneJob = %u=>%u)",
                 zcs->nextJobID,
                 (U32)zcs->jobs[jobID].srcSize,
                 zcs->jobs[jobID].lastChunk,
@@ -1094,18 +1094,18 @@ static size_t ZSTDMT_flushProduced(ZSTDMT_CCtx* zcs, ZSTD_outBuffer* output, uns
     DEBUGLOG(5, "ZSTDMT_flushProduced (blocking:%u)", blockToFlush);
     assert(output->size >= output->pos);
 
-    ZSTD_PTHREAD_MUTEX_LOCK(&zcs->jobCompleted_mutex);
+    ZSTD_PTHREAD_MUTEX_LOCK(&zcs->mtctx_mutex);
     if (blockToFlush && (zcs->doneJobID < zcs->nextJobID)) {
         while (zcs->jobs[wJobID].dstFlushed == zcs->jobs[wJobID].cSize) {
             if (zcs->jobs[wJobID].jobCompleted==1) break;
             DEBUGLOG(5, "waiting for something to flush from job %u (currently flushed: %u bytes)",
                         zcs->doneJobID, (U32)zcs->jobs[wJobID].dstFlushed);
-            ZSTD_pthread_cond_wait(&zcs->jobCompleted_cond, &zcs->jobCompleted_mutex);  /* block when nothing available to flush but more to come */
+            ZSTD_pthread_cond_wait(&zcs->mtctx_cond, &zcs->mtctx_mutex);  /* block when nothing available to flush but more to come */
     }   }
 
     /* some output is available to be flushed */
     {   ZSTDMT_jobDescription job = zcs->jobs[wJobID];
-        ZSTD_pthread_mutex_unlock(&zcs->jobCompleted_mutex);
+        ZSTD_pthread_mutex_unlock(&zcs->mtctx_mutex);
         if (ZSTD_isError(job.cSize)) {
             DEBUGLOG(5, "ZSTDMT_flushProduced: job %u : compression error detected : %s",
                         zcs->doneJobID, ZSTD_getErrorName(job.cSize));
@@ -1186,8 +1186,9 @@ size_t ZSTDMT_compressStream_generic(ZSTDMT_CCtx* mtctx,
     /* single-pass shortcut (note : synchronous-mode) */
     if ( (mtctx->nextJobID == 0)     /* just started */
       && (mtctx->inBuff.filled == 0) /* nothing buffered */
+      && (!mtctx->jobReady)          /* no job already created */
       && (endOp == ZSTD_e_end)       /* end order */
-      && (output->size - output->pos >= ZSTD_compressBound(input->size - input->pos)) ) { /* enough room */
+      && (output->size - output->pos >= ZSTD_compressBound(input->size - input->pos)) ) { /* enough space in dst */
         size_t const cSize = ZSTDMT_compress_advanced_internal(mtctx,
                 (char*)output->dst + output->pos, output->size - output->pos,
                 (const char*)input->src + input->pos, input->size - input->pos,
@@ -1234,7 +1235,7 @@ size_t ZSTDMT_compressStream_generic(ZSTDMT_CCtx* mtctx,
 
     /* check for potential compressed data ready to be flushed */
     {   size_t const remainingToFlush = ZSTDMT_flushProduced(mtctx, output, !forwardInputProgress, endOp); /* block if there was no forward input progress */
-        if (input->pos < input->size) return MAX(remainingToFlush, 1);  /* input not consumed : do not flush yet */
+        if (input->pos < input->size) return MAX(remainingToFlush, 1);  /* input not consumed : do not end flush yet */
         return remainingToFlush;
     }
 }