From: Yann Collet Date: Wed, 19 Sep 2018 23:30:55 +0000 (-0700) Subject: fixed minor reporting discrepancy in MT mode X-Git-Tag: v1.3.6^2~19^2~8 X-Git-Url: http://git.ipfire.org/cgi-bin/gitweb.cgi?a=commitdiff_plain;h=6b07a66aecbfff057397257456b69a148919e37e;p=thirdparty%2Fzstd.git fixed minor reporting discrepancy in MT mode --- diff --git a/lib/compress/zstdmt_compress.c b/lib/compress/zstdmt_compress.c index 39255fdcf..a7e93dacd 100644 --- a/lib/compress/zstdmt_compress.c +++ b/lib/compress/zstdmt_compress.c @@ -712,7 +712,12 @@ void ZSTDMT_compressionJob(void* jobDescription) assert(job->cSize == 0); for (chunkNb = 1; chunkNb < nbChunks; chunkNb++) { size_t const cSize = ZSTD_compressContinue(cctx, op, oend-op, ip, chunkSize); - if (ZSTD_isError(cSize)) { job->cSize = cSize; goto _endJob; } + if (ZSTD_isError(cSize)) { + ZSTD_PTHREAD_MUTEX_LOCK(&job->job_mutex); + job->cSize = cSize; + ZSTD_pthread_mutex_unlock(&job->job_mutex); + goto _endJob; + } ip += chunkSize; op += cSize; assert(op < oend); /* stats */ @@ -725,7 +730,8 @@ void ZSTDMT_compressionJob(void* jobDescription) ZSTD_pthread_mutex_unlock(&job->job_mutex); } /* last block */ - assert(chunkSize > 0); assert((chunkSize & (chunkSize - 1)) == 0); /* chunkSize must be power of 2 for mask==(chunkSize-1) to work */ + assert(chunkSize > 0); + assert((chunkSize & (chunkSize - 1)) == 0); /* chunkSize must be power of 2 for mask==(chunkSize-1) to work */ if ((nbChunks > 0) | job->lastJob /*must output a "last block" flag*/ ) { size_t const lastBlockSize1 = job->src.size & (chunkSize-1); size_t const lastBlockSize = ((lastBlockSize1==0) & (job->src.size>=chunkSize)) ? chunkSize : lastBlockSize1; @@ -736,6 +742,7 @@ void ZSTDMT_compressionJob(void* jobDescription) /* stats */ ZSTD_PTHREAD_MUTEX_LOCK(&job->job_mutex); job->cSize += cSize; + job->consumed = job->src.size; ZSTD_pthread_mutex_unlock(&job->job_mutex); } } @@ -748,10 +755,7 @@ _endJob: ZSTDMT_releaseSeq(job->seqPool, rawSeqStore); ZSTDMT_releaseCCtx(job->cctxPool, cctx); /* report */ - ZSTD_PTHREAD_MUTEX_LOCK(&job->job_mutex); - job->consumed = job->src.size; ZSTD_pthread_cond_signal(&job->job_cond); - ZSTD_pthread_mutex_unlock(&job->job_mutex); } @@ -1119,15 +1123,19 @@ size_t ZSTDMT_toFlushNow(ZSTDMT_CCtx* mtctx) assert(jobID <= mtctx->nextJobID); if (jobID == mtctx->nextJobID) return 0; /* no active job => nothing to flush */ + /* look into oldest non-fully-flushed job */ { unsigned const wJobID = jobID & mtctx->jobIDMask; - ZSTDMT_jobDescription* jobPtr = &mtctx->jobs[wJobID]; + ZSTDMT_jobDescription* const jobPtr = &mtctx->jobs[wJobID]; ZSTD_pthread_mutex_lock(&jobPtr->job_mutex); { size_t const cResult = jobPtr->cSize; size_t const produced = ZSTD_isError(cResult) ? 0 : cResult; size_t const flushed = ZSTD_isError(cResult) ? 0 : jobPtr->dstFlushed; assert(flushed <= produced); toFlush = produced - flushed; - if (toFlush==0) assert(jobPtr->consumed < jobPtr->src.size); /* if toFlush==0, doneJobID should still be active: if doneJobID is completed and fully flushed, ZSTDMT_flushProduced() should have already moved to next job */ + if (toFlush==0 && (jobPtr->consumed >= jobPtr->src.size)) { + /* doneJobID is not-fully-flushed, but toFlush==0 : doneJobID should be compressing some more data */ + assert(jobPtr->consumed < jobPtr->src.size); + } } ZSTD_pthread_mutex_unlock(&mtctx->jobs[wJobID].job_mutex); }