]> git.ipfire.org Git - thirdparty/zstd.git/commitdiff
fixed minor reporting discrepancy in MT mode
authorYann Collet <cyan@fb.com>
Wed, 19 Sep 2018 23:30:55 +0000 (16:30 -0700)
committerYann Collet <cyan@fb.com>
Wed, 19 Sep 2018 23:30:55 +0000 (16:30 -0700)
lib/compress/zstdmt_compress.c

index 39255fdcfdd421413cb71df2c9f8cb2f5628f3e1..a7e93dacdf1014983bd2d0da13dda2c18cf10b5b 100644 (file)
@@ -712,7 +712,12 @@ void ZSTDMT_compressionJob(void* jobDescription)
         assert(job->cSize == 0);
         for (chunkNb = 1; chunkNb < nbChunks; chunkNb++) {
             size_t const cSize = ZSTD_compressContinue(cctx, op, oend-op, ip, chunkSize);
-            if (ZSTD_isError(cSize)) { job->cSize = cSize; goto _endJob; }
+            if (ZSTD_isError(cSize)) {
+                ZSTD_PTHREAD_MUTEX_LOCK(&job->job_mutex);
+                job->cSize = cSize;
+                ZSTD_pthread_mutex_unlock(&job->job_mutex);
+                goto _endJob;
+            }
             ip += chunkSize;
             op += cSize; assert(op < oend);
             /* stats */
@@ -725,7 +730,8 @@ void ZSTDMT_compressionJob(void* jobDescription)
             ZSTD_pthread_mutex_unlock(&job->job_mutex);
         }
         /* last block */
-        assert(chunkSize > 0); assert((chunkSize & (chunkSize - 1)) == 0);  /* chunkSize must be power of 2 for mask==(chunkSize-1) to work */
+        assert(chunkSize > 0);
+        assert((chunkSize & (chunkSize - 1)) == 0);  /* chunkSize must be power of 2 for mask==(chunkSize-1) to work */
         if ((nbChunks > 0) | job->lastJob /*must output a "last block" flag*/ ) {
             size_t const lastBlockSize1 = job->src.size & (chunkSize-1);
             size_t const lastBlockSize = ((lastBlockSize1==0) & (job->src.size>=chunkSize)) ? chunkSize : lastBlockSize1;
@@ -736,6 +742,7 @@ void ZSTDMT_compressionJob(void* jobDescription)
             /* stats */
             ZSTD_PTHREAD_MUTEX_LOCK(&job->job_mutex);
             job->cSize += cSize;
+            job->consumed = job->src.size;
             ZSTD_pthread_mutex_unlock(&job->job_mutex);
     }   }
 
@@ -748,10 +755,7 @@ _endJob:
     ZSTDMT_releaseSeq(job->seqPool, rawSeqStore);
     ZSTDMT_releaseCCtx(job->cctxPool, cctx);
     /* report */
-    ZSTD_PTHREAD_MUTEX_LOCK(&job->job_mutex);
-    job->consumed = job->src.size;
     ZSTD_pthread_cond_signal(&job->job_cond);
-    ZSTD_pthread_mutex_unlock(&job->job_mutex);
 }
 
 
@@ -1119,15 +1123,19 @@ size_t ZSTDMT_toFlushNow(ZSTDMT_CCtx* mtctx)
     assert(jobID <= mtctx->nextJobID);
     if (jobID == mtctx->nextJobID) return 0;   /* no active job => nothing to flush */
 
+    /* look into oldest non-fully-flushed job */
     {   unsigned const wJobID = jobID & mtctx->jobIDMask;
-        ZSTDMT_jobDescription* jobPtr = &mtctx->jobs[wJobID];
+        ZSTDMT_jobDescription* const jobPtr = &mtctx->jobs[wJobID];
         ZSTD_pthread_mutex_lock(&jobPtr->job_mutex);
         {   size_t const cResult = jobPtr->cSize;
             size_t const produced = ZSTD_isError(cResult) ? 0 : cResult;
             size_t const flushed = ZSTD_isError(cResult) ? 0 : jobPtr->dstFlushed;
             assert(flushed <= produced);
             toFlush = produced - flushed;
-            if (toFlush==0) assert(jobPtr->consumed < jobPtr->src.size);   /* if toFlush==0, doneJobID should still be active: if doneJobID is completed and fully flushed, ZSTDMT_flushProduced() should have already moved to next job */
+            if (toFlush==0 && (jobPtr->consumed >= jobPtr->src.size)) {
+                /* doneJobID is not-fully-flushed, but toFlush==0 : doneJobID should be compressing some more data */
+                assert(jobPtr->consumed < jobPtr->src.size);
+            }
         }
         ZSTD_pthread_mutex_unlock(&mtctx->jobs[wJobID].job_mutex);
     }