From: Yann Collet Date: Thu, 25 Jan 2018 22:52:34 +0000 (-0800) Subject: zstdmt:: renamed mutex and cond to underline they are context-global X-Git-Tag: v1.3.4~1^2~67^2~15 X-Git-Url: http://git.ipfire.org/cgi-bin/gitweb.cgi?a=commitdiff_plain;h=1272d8e760eb8d6ad42cbc2fe1ec898a26b815f5;p=thirdparty%2Fzstd.git zstdmt:: renamed mutex and cond to underline they are context-global --- diff --git a/lib/compress/zstdmt_compress.c b/lib/compress/zstdmt_compress.c index 0207920dc..bc9209d5b 100644 --- a/lib/compress/zstdmt_compress.c +++ b/lib/compress/zstdmt_compress.c @@ -316,8 +316,8 @@ typedef struct { unsigned lastChunk; unsigned jobCompleted; unsigned frameChecksumNeeded; - ZSTD_pthread_mutex_t* jobCompleted_mutex; - ZSTD_pthread_cond_t* jobCompleted_cond; + ZSTD_pthread_mutex_t* mtctx_mutex; + ZSTD_pthread_cond_t* mtctx_cond; ZSTD_CCtx_params params; const ZSTD_CDict* cdict; ZSTDMT_CCtxPool* cctxPool; @@ -344,9 +344,9 @@ void ZSTDMT_compressChunk(void* jobDescription) job->cSize = ERROR(memory_allocation); goto _endJob; } - ZSTD_PTHREAD_MUTEX_LOCK(job->jobCompleted_mutex); /* note : it's a mtctx mutex */ + ZSTD_PTHREAD_MUTEX_LOCK(job->mtctx_mutex); job->dstBuff = dstBuff; - ZSTD_pthread_mutex_unlock(job->jobCompleted_mutex); + ZSTD_pthread_mutex_unlock(job->mtctx_mutex); } /* init */ @@ -399,13 +399,13 @@ void ZSTDMT_compressChunk(void* jobDescription) ip += ZSTD_BLOCKSIZE_MAX; op += cSize; assert(op < oend); /* stats */ - ZSTD_PTHREAD_MUTEX_LOCK(job->jobCompleted_mutex); /* note : it's a mtctx mutex */ + ZSTD_PTHREAD_MUTEX_LOCK(job->mtctx_mutex); /* note : it's a mtctx mutex */ job->cSize += cSize; job->consumed = ZSTD_BLOCKSIZE_MAX * blockNb; DEBUGLOG(5, "ZSTDMT_compressChunk: compress new block : cSize==%u bytes (total: %u)", (U32)cSize, (U32)job->cSize); - ZSTD_pthread_cond_signal(job->jobCompleted_cond); - ZSTD_pthread_mutex_unlock(job->jobCompleted_mutex); + ZSTD_pthread_cond_signal(job->mtctx_cond); + ZSTD_pthread_mutex_unlock(job->mtctx_mutex); } /* last block */ if ((nbBlocks > 0) | job->lastChunk /*must output a "last block" flag*/ ) { @@ -416,10 +416,10 @@ void ZSTDMT_compressChunk(void* jobDescription) ZSTD_compressContinue(cctx, op, oend-op, ip, lastBlockSize); if (ZSTD_isError(cSize)) { job->cSize = cSize; goto _endJob; } /* stats */ - ZSTD_PTHREAD_MUTEX_LOCK(job->jobCompleted_mutex); /* note : it's a mtctx mutex */ + ZSTD_PTHREAD_MUTEX_LOCK(job->mtctx_mutex); /* note : it's a mtctx mutex */ job->cSize += cSize; job->consumed = job->srcSize; - ZSTD_pthread_mutex_unlock(job->jobCompleted_mutex); + ZSTD_pthread_mutex_unlock(job->mtctx_mutex); } } #endif @@ -430,11 +430,11 @@ _endJob: ZSTDMT_releaseBuffer(job->bufPool, job->src); job->src = g_nullBuffer; job->srcStart = NULL; /* report */ - ZSTD_PTHREAD_MUTEX_LOCK(job->jobCompleted_mutex); + ZSTD_PTHREAD_MUTEX_LOCK(job->mtctx_mutex); job->consumed = job->srcSize; job->jobCompleted = 1; - ZSTD_pthread_cond_signal(job->jobCompleted_cond); - ZSTD_pthread_mutex_unlock(job->jobCompleted_mutex); + ZSTD_pthread_cond_signal(job->mtctx_cond); + ZSTD_pthread_mutex_unlock(job->mtctx_mutex); } @@ -452,8 +452,8 @@ struct ZSTDMT_CCtx_s { ZSTDMT_jobDescription* jobs; ZSTDMT_bufferPool* bufPool; ZSTDMT_CCtxPool* cctxPool; - ZSTD_pthread_mutex_t jobCompleted_mutex; - ZSTD_pthread_cond_t jobCompleted_cond; + ZSTD_pthread_mutex_t mtctx_mutex; + ZSTD_pthread_cond_t mtctx_cond; ZSTD_CCtx_params params; size_t targetSectionSize; size_t inBuffSize; @@ -538,11 +538,11 @@ ZSTDMT_CCtx* ZSTDMT_createCCtx_advanced(unsigned nbThreads, ZSTD_customMem cMem) ZSTDMT_freeCCtx(mtctx); return NULL; } - if (ZSTD_pthread_mutex_init(&mtctx->jobCompleted_mutex, NULL)) { + if (ZSTD_pthread_mutex_init(&mtctx->mtctx_mutex, NULL)) { ZSTDMT_freeCCtx(mtctx); return NULL; } - if (ZSTD_pthread_cond_init(&mtctx->jobCompleted_cond, NULL)) { + if (ZSTD_pthread_cond_init(&mtctx->mtctx_cond, NULL)) { ZSTDMT_freeCCtx(mtctx); return NULL; } @@ -582,12 +582,12 @@ static void ZSTDMT_waitForAllJobsCompleted(ZSTDMT_CCtx* zcs) DEBUGLOG(4, "ZSTDMT_waitForAllJobsCompleted"); while (zcs->doneJobID < zcs->nextJobID) { unsigned const jobID = zcs->doneJobID & zcs->jobIDMask; - ZSTD_PTHREAD_MUTEX_LOCK(&zcs->jobCompleted_mutex); + ZSTD_PTHREAD_MUTEX_LOCK(&zcs->mtctx_mutex); while (zcs->jobs[jobID].jobCompleted==0) { DEBUGLOG(5, "waiting for jobCompleted signal from chunk %u", zcs->doneJobID); /* we want to block when waiting for data to flush */ - ZSTD_pthread_cond_wait(&zcs->jobCompleted_cond, &zcs->jobCompleted_mutex); + ZSTD_pthread_cond_wait(&zcs->mtctx_cond, &zcs->mtctx_mutex); } - ZSTD_pthread_mutex_unlock(&zcs->jobCompleted_mutex); + ZSTD_pthread_mutex_unlock(&zcs->mtctx_mutex); zcs->doneJobID++; } } @@ -601,8 +601,8 @@ size_t ZSTDMT_freeCCtx(ZSTDMT_CCtx* mtctx) ZSTDMT_freeBufferPool(mtctx->bufPool); ZSTDMT_freeCCtxPool(mtctx->cctxPool); ZSTD_freeCDict(mtctx->cdictLocal); - ZSTD_pthread_mutex_destroy(&mtctx->jobCompleted_mutex); - ZSTD_pthread_cond_destroy(&mtctx->jobCompleted_cond); + ZSTD_pthread_mutex_destroy(&mtctx->mtctx_mutex); + ZSTD_pthread_cond_destroy(&mtctx->mtctx_cond); ZSTD_free(mtctx, mtctx->cMem); return 0; } @@ -672,7 +672,7 @@ ZSTD_frameProgression ZSTDMT_getFrameProgression(ZSTDMT_CCtx* mtctx) { ZSTD_frameProgression fs; DEBUGLOG(6, "ZSTDMT_getFrameProgression"); - ZSTD_PTHREAD_MUTEX_LOCK(&mtctx->jobCompleted_mutex); + ZSTD_PTHREAD_MUTEX_LOCK(&mtctx->mtctx_mutex); fs.consumed = mtctx->consumed; fs.produced = mtctx->produced; assert(mtctx->inBuff.filled >= mtctx->prefixSize); @@ -690,7 +690,7 @@ ZSTD_frameProgression ZSTDMT_getFrameProgression(ZSTDMT_CCtx* mtctx) fs.produced += produced; } } - ZSTD_pthread_mutex_unlock(&mtctx->jobCompleted_mutex); + ZSTD_pthread_mutex_unlock(&mtctx->mtctx_mutex); return fs; } @@ -783,8 +783,8 @@ static size_t ZSTDMT_compress_advanced_internal( mtctx->jobs[u].firstChunk = (u==0); mtctx->jobs[u].lastChunk = (u==nbChunks-1); mtctx->jobs[u].jobCompleted = 0; - mtctx->jobs[u].jobCompleted_mutex = &mtctx->jobCompleted_mutex; - mtctx->jobs[u].jobCompleted_cond = &mtctx->jobCompleted_cond; + mtctx->jobs[u].mtctx_mutex = &mtctx->mtctx_mutex; + mtctx->jobs[u].mtctx_cond = &mtctx->mtctx_cond; if (params.fParams.checksumFlag) { XXH64_update(&xxh64, srcStart + frameStartPos, chunkSize); @@ -804,12 +804,12 @@ static size_t ZSTDMT_compress_advanced_internal( unsigned chunkID; for (chunkID=0; chunkIDjobCompleted_mutex); + ZSTD_PTHREAD_MUTEX_LOCK(&mtctx->mtctx_mutex); while (mtctx->jobs[chunkID].jobCompleted==0) { DEBUGLOG(5, "waiting for jobCompleted signal from chunk %u", chunkID); - ZSTD_pthread_cond_wait(&mtctx->jobCompleted_cond, &mtctx->jobCompleted_mutex); + ZSTD_pthread_cond_wait(&mtctx->mtctx_cond, &mtctx->mtctx_mutex); } - ZSTD_pthread_mutex_unlock(&mtctx->jobCompleted_mutex); + ZSTD_pthread_mutex_unlock(&mtctx->mtctx_mutex); DEBUGLOG(5, "ready to write chunk %u ", chunkID); mtctx->jobs[chunkID].srcStart = NULL; @@ -1035,8 +1035,8 @@ static size_t ZSTDMT_createCompressionJob(ZSTDMT_CCtx* zcs, size_t srcSize, ZSTD zcs->jobs[jobID].jobCompleted = 0; zcs->jobs[jobID].frameChecksumNeeded = endFrame && (zcs->nextJobID>0) && zcs->params.fParams.checksumFlag; zcs->jobs[jobID].dstFlushed = 0; - zcs->jobs[jobID].jobCompleted_mutex = &zcs->jobCompleted_mutex; - zcs->jobs[jobID].jobCompleted_cond = &zcs->jobCompleted_cond; + zcs->jobs[jobID].mtctx_mutex = &zcs->mtctx_mutex; + zcs->jobs[jobID].mtctx_cond = &zcs->mtctx_cond; if (zcs->params.fParams.checksumFlag) XXH64_update(&zcs->xxhState, (const char*)zcs->inBuff.buffer.start + zcs->prefixSize, srcSize); @@ -1067,7 +1067,7 @@ static size_t ZSTDMT_createCompressionJob(ZSTDMT_CCtx* zcs, size_t srcSize, ZSTD zcs->params.fParams.checksumFlag = 0; } } } - DEBUGLOG(5, "ZSTDMT_createCompressionJob: posting job %u : %u bytes (end:%u) (note : doneJob = %u=>%u)", + DEBUGLOG(2, "ZSTDMT_createCompressionJob: posting job %u : %u bytes (end:%u) (note : doneJob = %u=>%u)", zcs->nextJobID, (U32)zcs->jobs[jobID].srcSize, zcs->jobs[jobID].lastChunk, @@ -1094,18 +1094,18 @@ static size_t ZSTDMT_flushProduced(ZSTDMT_CCtx* zcs, ZSTD_outBuffer* output, uns DEBUGLOG(5, "ZSTDMT_flushProduced (blocking:%u)", blockToFlush); assert(output->size >= output->pos); - ZSTD_PTHREAD_MUTEX_LOCK(&zcs->jobCompleted_mutex); + ZSTD_PTHREAD_MUTEX_LOCK(&zcs->mtctx_mutex); if (blockToFlush && (zcs->doneJobID < zcs->nextJobID)) { while (zcs->jobs[wJobID].dstFlushed == zcs->jobs[wJobID].cSize) { if (zcs->jobs[wJobID].jobCompleted==1) break; DEBUGLOG(5, "waiting for something to flush from job %u (currently flushed: %u bytes)", zcs->doneJobID, (U32)zcs->jobs[wJobID].dstFlushed); - ZSTD_pthread_cond_wait(&zcs->jobCompleted_cond, &zcs->jobCompleted_mutex); /* block when nothing available to flush but more to come */ + ZSTD_pthread_cond_wait(&zcs->mtctx_cond, &zcs->mtctx_mutex); /* block when nothing available to flush but more to come */ } } /* some output is available to be flushed */ { ZSTDMT_jobDescription job = zcs->jobs[wJobID]; - ZSTD_pthread_mutex_unlock(&zcs->jobCompleted_mutex); + ZSTD_pthread_mutex_unlock(&zcs->mtctx_mutex); if (ZSTD_isError(job.cSize)) { DEBUGLOG(5, "ZSTDMT_flushProduced: job %u : compression error detected : %s", zcs->doneJobID, ZSTD_getErrorName(job.cSize)); @@ -1186,8 +1186,9 @@ size_t ZSTDMT_compressStream_generic(ZSTDMT_CCtx* mtctx, /* single-pass shortcut (note : synchronous-mode) */ if ( (mtctx->nextJobID == 0) /* just started */ && (mtctx->inBuff.filled == 0) /* nothing buffered */ + && (!mtctx->jobReady) /* no job already created */ && (endOp == ZSTD_e_end) /* end order */ - && (output->size - output->pos >= ZSTD_compressBound(input->size - input->pos)) ) { /* enough room */ + && (output->size - output->pos >= ZSTD_compressBound(input->size - input->pos)) ) { /* enough space in dst */ size_t const cSize = ZSTDMT_compress_advanced_internal(mtctx, (char*)output->dst + output->pos, output->size - output->pos, (const char*)input->src + input->pos, input->size - input->pos, @@ -1234,7 +1235,7 @@ size_t ZSTDMT_compressStream_generic(ZSTDMT_CCtx* mtctx, /* check for potential compressed data ready to be flushed */ { size_t const remainingToFlush = ZSTDMT_flushProduced(mtctx, output, !forwardInputProgress, endOp); /* block if there was no forward input progress */ - if (input->pos < input->size) return MAX(remainingToFlush, 1); /* input not consumed : do not flush yet */ + if (input->pos < input->size) return MAX(remainingToFlush, 1); /* input not consumed : do not end flush yet */ return remainingToFlush; } }