From: Yann Collet Date: Fri, 26 Jan 2018 20:15:43 +0000 (-0800) Subject: zstdmt : flush() only lock to read shared job members X-Git-Tag: v1.3.4~1^2~67^2~9 X-Git-Url: http://git.ipfire.org/cgi-bin/gitweb.cgi?a=commitdiff_plain;h=79b6e28b0a168bb076dbaf653b1a7d71bbd1a201;p=thirdparty%2Fzstd.git zstdmt : flush() only lock to read shared job members Other job members are accessed directly. This avoids a full job copy, which would access everything, including a few members that are supposed to be used by worker only, uselessly requiring additional locks to avoid race conditions. --- diff --git a/lib/compress/zstdmt_compress.c b/lib/compress/zstdmt_compress.c index 7b37c5b33..c7df32d37 100644 --- a/lib/compress/zstdmt_compress.c +++ b/lib/compress/zstdmt_compress.c @@ -309,16 +309,16 @@ static void ZSTDMT_releaseCCtx(ZSTDMT_CCtxPool* pool, ZSTD_CCtx* cctx) typedef struct { size_t consumed; /* SHARED - set0 by mtctx, then modified by worker AND read by mtctx */ - size_t cSize; /* SHARED - set0 by mtctx, then modified by worker AND read by mtctx */ - ZSTD_pthread_mutex_t* mtctx_mutex; /* Thread-safe - used by mtctx and (all) worker */ - ZSTD_pthread_cond_t* mtctx_cond; /* Thread-safe - used by mtctx and (all) worker */ - ZSTDMT_CCtxPool* cctxPool; /* Thread-safe - used by mtctx and (all) worker */ - ZSTDMT_bufferPool* bufPool; /* Thread-safe - used by mtctx and (all) worker */ - buffer_t dstBuff; /* set by worker (or mtctx), then read by worker, then modified by mtctx => no barrier */ + size_t cSize; /* SHARED - set0 by mtctx, then modified by worker AND read by mtctx, then set0 by mtctx */ + ZSTD_pthread_mutex_t* mtctx_mutex; /* Thread-safe - used by mtctx and (all) workers */ + ZSTD_pthread_cond_t* mtctx_cond; /* Thread-safe - used by mtctx and (all) workers */ + ZSTDMT_CCtxPool* cctxPool; /* Thread-safe - used by mtctx and (all) workers */ + ZSTDMT_bufferPool* bufPool; /* Thread-safe - used by mtctx and (all) workers */ + buffer_t dstBuff; /* set by worker (or mtctx), then read by worker & mtctx, then modified by mtctx => no barrier */ buffer_t srcBuff; /* set by mtctx, then released by worker => no barrier */ - const void* prefixStart; /* set by mtctx, then read by worker => no barrier */ + const void* prefixStart; /* set by mtctx, then read and set0 by worker => no barrier */ size_t prefixSize; /* set by mtctx, then read by worker => no barrier */ - size_t srcSize; /* set by mtctx, then read by worker => no barrier */ + size_t srcSize; /* set by mtctx, then read by worker & mtctx => no barrier */ unsigned firstChunk; /* set by mtctx, then read by worker => no barrier */ unsigned lastChunk; /* set by mtctx, then read by worker => no barrier */ ZSTD_CCtx_params params; /* set by mtctx, then read by worker => no barrier */ @@ -341,15 +341,13 @@ void ZSTDMT_compressChunk(void* jobDescription) job->cSize = ERROR(memory_allocation); goto _endJob; } - if (dstBuff.start == NULL) { + if (dstBuff.start == NULL) { /* streaming job : doesn't provide a dstBuffer */ dstBuff = ZSTDMT_getBuffer(job->bufPool); if (dstBuff.start==NULL) { job->cSize = ERROR(memory_allocation); goto _endJob; } - ZSTD_PTHREAD_MUTEX_LOCK(job->mtctx_mutex); job->dstBuff = dstBuff; /* this value can be read in ZSTDMT_flush, when it copies the whole job */ - ZSTD_pthread_mutex_unlock(job->mtctx_mutex); } /* init */ @@ -1087,6 +1085,7 @@ static size_t ZSTDMT_createCompressionJob(ZSTDMT_CCtx* mtctx, size_t srcSize, ZS if ( (srcSize == 0) && (mtctx->nextJobID>0)/*single chunk must also write frame header*/ ) { + DEBUGLOG(5, "ZSTDMT_createCompressionJob: creating a last empty block to end frame"); assert(endOp == ZSTD_e_end); /* only possible case : need to end the frame with an empty last block */ ZSTDMT_writeLastEmptyBlock(mtctx->jobs + jobID); mtctx->nextJobID++; @@ -1094,12 +1093,12 @@ static size_t ZSTDMT_createCompressionJob(ZSTDMT_CCtx* mtctx, size_t srcSize, ZS } } - DEBUGLOG(5, "ZSTDMT_createCompressionJob: posting job %u : %u bytes (end:%u) (note : doneJob = %u=>%u)", + DEBUGLOG(5, "ZSTDMT_createCompressionJob: posting job %u : %u bytes (end:%u, jobNb == %u (mod:%u))", mtctx->nextJobID, (U32)mtctx->jobs[jobID].srcSize, mtctx->jobs[jobID].lastChunk, - mtctx->doneJobID, - mtctx->doneJobID & mtctx->jobIDMask); + mtctx->nextJobID, + jobID); if (POOL_tryAdd(mtctx->factory, ZSTDMT_compressChunk, &mtctx->jobs[jobID])) { mtctx->nextJobID++; mtctx->jobReady = 0; @@ -1118,15 +1117,17 @@ static size_t ZSTDMT_createCompressionJob(ZSTDMT_CCtx* mtctx, size_t srcSize, ZS static size_t ZSTDMT_flushProduced(ZSTDMT_CCtx* mtctx, ZSTD_outBuffer* output, unsigned blockToFlush, ZSTD_EndDirective end) { unsigned const wJobID = mtctx->doneJobID & mtctx->jobIDMask; - DEBUGLOG(5, "ZSTDMT_flushProduced (blocking:%u)", blockToFlush); + DEBUGLOG(5, "ZSTDMT_flushProduced (blocking:%u , job %u <= %u)", + blockToFlush, mtctx->doneJobID, mtctx->nextJobID); assert(output->size >= output->pos); ZSTD_PTHREAD_MUTEX_LOCK(&mtctx->mtctx_mutex); - if (blockToFlush && (mtctx->doneJobID < mtctx->nextJobID)) { + if ( blockToFlush + && (mtctx->doneJobID < mtctx->nextJobID) ) { assert(mtctx->jobs[wJobID].dstFlushed <= mtctx->jobs[wJobID].cSize); - while (mtctx->jobs[wJobID].dstFlushed == mtctx->jobs[wJobID].cSize) { + while (mtctx->jobs[wJobID].dstFlushed == mtctx->jobs[wJobID].cSize) { /* nothing to flush */ if (mtctx->jobs[wJobID].consumed == mtctx->jobs[wJobID].srcSize) { - DEBUGLOG(5, "job %u is completely consumed (%u == %u) => don't wait for cond", + DEBUGLOG(5, "job %u is completely consumed (%u == %u) => don't wait for cond, there will be none", mtctx->doneJobID, (U32)mtctx->jobs[wJobID].consumed, (U32)mtctx->jobs[wJobID].srcSize); break; } @@ -1135,60 +1136,60 @@ static size_t ZSTDMT_flushProduced(ZSTDMT_CCtx* mtctx, ZSTD_outBuffer* output, u ZSTD_pthread_cond_wait(&mtctx->mtctx_cond, &mtctx->mtctx_mutex); /* block when nothing available to flush but more to come */ } } - /* some output is available to be flushed */ - { ZSTDMT_jobDescription job = mtctx->jobs[wJobID]; + /* try to flush something */ + { size_t cSize = mtctx->jobs[wJobID].cSize; + size_t const srcConsumed = mtctx->jobs[wJobID].consumed; ZSTD_pthread_mutex_unlock(&mtctx->mtctx_mutex); - if (ZSTD_isError(job.cSize)) { + if (ZSTD_isError(cSize)) { DEBUGLOG(5, "ZSTDMT_flushProduced: job %u : compression error detected : %s", - mtctx->doneJobID, ZSTD_getErrorName(job.cSize)); + mtctx->doneJobID, ZSTD_getErrorName(cSize)); ZSTDMT_waitForAllJobsCompleted(mtctx); ZSTDMT_releaseAllJobResources(mtctx); - return job.cSize; + return cSize; } /* add frame checksum if necessary (can only happen once) */ - assert(job.consumed <= job.srcSize); - if ( (job.consumed == job.srcSize) - && job.frameChecksumNeeded ) { + assert(srcConsumed <= mtctx->jobs[wJobID].srcSize); + if ( (srcConsumed == mtctx->jobs[wJobID].srcSize) /* job completed -> worker no longer active */ + && mtctx->jobs[wJobID].frameChecksumNeeded ) { U32 const checksum = (U32)XXH64_digest(&mtctx->xxhState); DEBUGLOG(4, "ZSTDMT_flushProduced: writing checksum : %08X \n", checksum); - MEM_writeLE32((char*)job.dstBuff.start + job.cSize, checksum); - job.cSize += 4; - mtctx->jobs[wJobID].cSize += 4; + MEM_writeLE32((char*)mtctx->jobs[wJobID].dstBuff.start + mtctx->jobs[wJobID].cSize, checksum); + cSize += 4; + mtctx->jobs[wJobID].cSize += 4; /* can write this shared value, as worker is no longer active */ mtctx->jobs[wJobID].frameChecksumNeeded = 0; } - assert(job.cSize >= job.dstFlushed); - if (job.dstBuff.start != NULL) { /* dst buffer present : some work is ongoing or completed */ - size_t const toWrite = MIN(job.cSize - job.dstFlushed, output->size - output->pos); - DEBUGLOG(5, "ZSTDMT_flushProduced: Flushing %u bytes from job %u (completion:%.1f%%)", - (U32)toWrite, mtctx->doneJobID, (double)job.consumed / (job.srcSize + !job.srcSize /*avoid div0*/) * 100); - memcpy((char*)output->dst + output->pos, (const char*)job.dstBuff.start + job.dstFlushed, toWrite); + if (cSize > 0) { /* compression is ongoing or completed */ + size_t const toWrite = MIN(cSize - mtctx->jobs[wJobID].dstFlushed, output->size - output->pos); + DEBUGLOG(5, "ZSTDMT_flushProduced: Flushing %u bytes from job %u (completion:%u/%u)", + (U32)toWrite, mtctx->doneJobID, (U32)srcConsumed, (U32)mtctx->jobs[wJobID].srcSize); + assert(cSize >= mtctx->jobs[wJobID].dstFlushed); + memcpy((char*)output->dst + output->pos, (const char*)mtctx->jobs[wJobID].dstBuff.start + mtctx->jobs[wJobID].dstFlushed, toWrite); output->pos += toWrite; - job.dstFlushed += toWrite; + mtctx->jobs[wJobID].dstFlushed += toWrite; /* can write : this value is only used by mtctx */ - if ( (job.consumed == job.srcSize) /* job completed */ - && (job.dstFlushed == job.cSize) ) { /* output buffer fully flushed => free this job position */ + if ( (srcConsumed == mtctx->jobs[wJobID].srcSize) /* job completed */ + && (mtctx->jobs[wJobID].dstFlushed == cSize) ) { /* output buffer fully flushed => free this job position */ DEBUGLOG(5, "Job %u completed (%u bytes), moving to next one", - mtctx->doneJobID, (U32)job.dstFlushed); - assert(job.srcBuff.start == NULL); /* srcBuff supposed already released */ - ZSTDMT_releaseBuffer(mtctx->bufPool, job.dstBuff); + mtctx->doneJobID, (U32)mtctx->jobs[wJobID].dstFlushed); + assert(mtctx->jobs[wJobID].srcBuff.start == NULL); /* srcBuff supposed already released */ + ZSTDMT_releaseBuffer(mtctx->bufPool, mtctx->jobs[wJobID].dstBuff); mtctx->jobs[wJobID].dstBuff = g_nullBuffer; - mtctx->consumed += job.srcSize; - mtctx->produced += job.cSize; + mtctx->jobs[wJobID].cSize = 0; /* ensure this job slot is considered "not started" in future check */ + mtctx->consumed += mtctx->jobs[wJobID].srcSize; + mtctx->produced += cSize; mtctx->doneJobID++; - } else { - mtctx->jobs[wJobID].dstFlushed = job.dstFlushed; /* remember how much was flushed for next attempt */ } } /* return value : how many bytes left in buffer ; fake it to 1 when unknown but >0 */ - if (job.cSize > job.dstFlushed) return (job.cSize - job.dstFlushed); - if (job.srcSize > job.consumed) return 1; /* current job not completely compressed */ + if (cSize > mtctx->jobs[wJobID].dstFlushed) return (cSize - mtctx->jobs[wJobID].dstFlushed); + if (mtctx->jobs[wJobID].srcSize > srcConsumed) return 1; /* current job not completely compressed */ } - if (mtctx->doneJobID < mtctx->nextJobID) return 1; /* some more jobs to flush */ - if (mtctx->jobReady) return 1; /* one job is ready and queued! */ - if (mtctx->inBuff.filled > 0) return 1; /* input not empty */ - mtctx->allJobsCompleted = mtctx->frameEnded; /* last frame entirely flushed */ - if (end == ZSTD_e_end) return !mtctx->frameEnded; /* for ZSTD_e_end, question becomes : is frame completed ? It may need a last null-block */ - return 0; /* everything flushed */ + if (mtctx->doneJobID < mtctx->nextJobID) return 1; /* some more jobs ongoing */ + if (mtctx->jobReady) return 1; /* one job is ready to push, just not yet in the list */ + if (mtctx->inBuff.filled > 0) return 1; /* input is not empty, and still needs to be converted into a job */ + mtctx->allJobsCompleted = mtctx->frameEnded; /* all chunks are entirely flushed => if this one is last one, frame is completed */ + if (end == ZSTD_e_end) return !mtctx->frameEnded; /* for ZSTD_e_end, question becomes : is frame completed ? instead of : are internal buffers fully flushed ? */ + return 0; /* internal buffers fully flushed */ } @@ -1217,10 +1218,10 @@ size_t ZSTDMT_compressStream_generic(ZSTDMT_CCtx* mtctx, } /* single-pass shortcut (note : synchronous-mode) */ - if ( (mtctx->nextJobID == 0) /* just started */ - && (mtctx->inBuff.filled == 0) /* nothing buffered */ - && (!mtctx->jobReady) /* no job already created */ - && (endOp == ZSTD_e_end) /* end order */ + if ( (mtctx->nextJobID == 0) /* just started */ + && (mtctx->inBuff.filled == 0) /* nothing buffered */ + && (!mtctx->jobReady) /* no job already created */ + && (endOp == ZSTD_e_end) /* end order */ && (output->size - output->pos >= ZSTD_compressBound(input->size - input->pos)) ) { /* enough space in dst */ size_t const cSize = ZSTDMT_compress_advanced_internal(mtctx, (char*)output->dst + output->pos, output->size - output->pos,