From: Yann Collet Date: Fri, 26 Jan 2018 18:44:09 +0000 (-0800) Subject: zstdmt : fixed memory leak X-Git-Tag: v1.3.4~1^2~67^2~11 X-Git-Url: http://git.ipfire.org/cgi-bin/gitweb.cgi?a=commitdiff_plain;h=fca13c6855d28d6ba8628794465372cb6088442d;p=thirdparty%2Fzstd.git zstdmt : fixed memory leak writeLastEmptyBlock() must release srcBuffer as mtctx assumes it's done by job worker. minor : changed 2 job member names (src->srcBuffer, srcStart->prefixStart) for clarity --- diff --git a/lib/compress/zstdmt_compress.c b/lib/compress/zstdmt_compress.c index 4e53367fd..9fea4969f 100644 --- a/lib/compress/zstdmt_compress.c +++ b/lib/compress/zstdmt_compress.c @@ -304,19 +304,19 @@ static void ZSTDMT_releaseCCtx(ZSTDMT_CCtxPool* pool, ZSTD_CCtx* cctx) /* ------------------------------------------ */ typedef struct { - size_t consumed; /* SHARED - init0 by mtctx, then modified by worker AND read by mtctx */ - size_t cSize; /* SHARED - init0 by mtctx, then modified by worker AND read by mtctx */ + size_t consumed; /* SHARED - set0 by mtctx, then modified by worker AND read by mtctx */ + size_t cSize; /* SHARED - set0 by mtctx, then modified by worker AND read by mtctx */ ZSTD_pthread_mutex_t* mtctx_mutex; /* Thread-safe - used by mtctx and (all) worker */ ZSTD_pthread_cond_t* mtctx_cond; /* Thread-safe - used by mtctx and (all) worker */ ZSTDMT_CCtxPool* cctxPool; /* Thread-safe - used by mtctx and (all) worker */ ZSTDMT_bufferPool* bufPool; /* Thread-safe - used by mtctx and (all) worker */ - buffer_t dstBuff; /* set by worker (or mtctx), then read by worker, then modified by mtctx => no barrier */ - buffer_t src; /* set by mtctx, then modified by worker => no barrier */ - const void* srcStart; /* set by mtctx, then read by worker => no barrier */ - size_t prefixSize; /* set by mtctx, then read by worker => no barrier */ - size_t srcSize; /* set by mtctx, then read by worker => no barrier */ - unsigned firstChunk; /* set by mtctx, then read by worker => no barrier */ - unsigned lastChunk; /* set by mtctx, then read by worker => no barrier */ + buffer_t dstBuff; /* set by worker (or mtctx), then read by worker, then modified by mtctx => no barrier */ + buffer_t srcBuff; /* set by mtctx, then released by worker => no barrier */ + const void* prefixStart; /* set by mtctx, then read by worker => no barrier */ + size_t prefixSize; /* set by mtctx, then read by worker => no barrier */ + size_t srcSize; /* set by mtctx, then read by worker => no barrier */ + unsigned firstChunk; /* set by mtctx, then read by worker => no barrier */ + unsigned lastChunk; /* set by mtctx, then read by worker => no barrier */ ZSTD_CCtx_params params; /* set by mtctx, then read by worker => no barrier */ const ZSTD_CDict* cdict; /* set by mtctx, then read by worker => no barrier */ unsigned long long fullFrameSize; /* set by mtctx, then read by worker => no barrier */ @@ -329,7 +329,7 @@ void ZSTDMT_compressChunk(void* jobDescription) { ZSTDMT_jobDescription* const job = (ZSTDMT_jobDescription*)jobDescription; ZSTD_CCtx* const cctx = ZSTDMT_getCCtx(job->cctxPool); - const void* const src = (const char*)job->srcStart + job->prefixSize; + const void* const src = (const char*)job->prefixStart + job->prefixSize; buffer_t dstBuff = job->dstBuff; /* ressources */ @@ -362,7 +362,7 @@ void ZSTDMT_compressChunk(void* jobDescription) goto _endJob; } } { size_t const initError = ZSTD_compressBegin_advanced_internal(cctx, - job->srcStart, job->prefixSize, ZSTD_dm_rawContent, /* load dictionary in "content-only" mode (no header analysis) */ + job->prefixStart, job->prefixSize, ZSTD_dm_rawContent, /* load dictionary in "content-only" mode (no header analysis) */ NULL, /*cdict*/ jobParams, pledgedSrcSize); if (ZSTD_isError(initError)) { @@ -419,8 +419,8 @@ void ZSTDMT_compressChunk(void* jobDescription) _endJob: /* release resources */ ZSTDMT_releaseCCtx(job->cctxPool, cctx); - ZSTDMT_releaseBuffer(job->bufPool, job->src); - job->src = g_nullBuffer; job->srcStart = NULL; + ZSTDMT_releaseBuffer(job->bufPool, job->srcBuff); + job->srcBuff = g_nullBuffer; job->prefixStart = NULL; /* report */ ZSTD_PTHREAD_MUTEX_LOCK(job->mtctx_mutex); job->consumed = job->srcSize; @@ -557,9 +557,9 @@ static void ZSTDMT_releaseAllJobResources(ZSTDMT_CCtx* mtctx) DEBUGLOG(4, "job%02u: release dst address %08X", jobID, (U32)(size_t)mtctx->jobs[jobID].dstBuff.start); ZSTDMT_releaseBuffer(mtctx->bufPool, mtctx->jobs[jobID].dstBuff); mtctx->jobs[jobID].dstBuff = g_nullBuffer; - DEBUGLOG(4, "job%02u: release src address %08X", jobID, (U32)(size_t)mtctx->jobs[jobID].src.start); - ZSTDMT_releaseBuffer(mtctx->bufPool, mtctx->jobs[jobID].src); - mtctx->jobs[jobID].src = g_nullBuffer; + DEBUGLOG(4, "job%02u: release src address %08X", jobID, (U32)(size_t)mtctx->jobs[jobID].srcBuff.start); + ZSTDMT_releaseBuffer(mtctx->bufPool, mtctx->jobs[jobID].srcBuff); + mtctx->jobs[jobID].srcBuff = g_nullBuffer; } memset(mtctx->jobs, 0, (mtctx->jobIDMask+1)*sizeof(ZSTDMT_jobDescription)); DEBUGLOG(4, "input: release address %08X", (U32)(size_t)mtctx->inBuff.buffer.start); @@ -757,8 +757,8 @@ static size_t ZSTDMT_compress_advanced_internal( buffer_t const dstBuffer = u < compressWithinDst ? dstAsBuffer : g_nullBuffer; size_t dictSize = u ? overlapSize : 0; - mtctx->jobs[u].src = g_nullBuffer; - mtctx->jobs[u].srcStart = srcStart + frameStartPos - dictSize; + mtctx->jobs[u].srcBuff = g_nullBuffer; + mtctx->jobs[u].prefixStart = srcStart + frameStartPos - dictSize; mtctx->jobs[u].prefixSize = dictSize; mtctx->jobs[u].srcSize = chunkSize; assert(chunkSize > 0); /* avoid job.srcSize == 0 */ mtctx->jobs[u].consumed = 0; @@ -781,7 +781,7 @@ static size_t ZSTDMT_compress_advanced_internal( } DEBUGLOG(5, "ZSTDMT_compress_advanced_internal: posting job %u (%u bytes)", u, (U32)chunkSize); - DEBUG_PRINTHEX(6, mtctx->jobs[u].srcStart, 12); + DEBUG_PRINTHEX(6, mtctx->jobs[u].prefixStart, 12); POOL_add(mtctx->factory, ZSTDMT_compressChunk, &mtctx->jobs[u]); frameStartPos += chunkSize; @@ -802,7 +802,7 @@ static size_t ZSTDMT_compress_advanced_internal( ZSTD_pthread_mutex_unlock(&mtctx->mtctx_mutex); DEBUGLOG(5, "ready to write chunk %u ", chunkID); - mtctx->jobs[chunkID].srcStart = NULL; + mtctx->jobs[chunkID].prefixStart = NULL; { size_t const cSize = mtctx->jobs[chunkID].cSize; if (ZSTD_isError(cSize)) error = cSize; if ((!error) && (dstPos + cSize > dstCapacity)) error = ERROR(dstSize_tooSmall); @@ -999,18 +999,21 @@ size_t ZSTDMT_initCStream(ZSTDMT_CCtx* mtctx, int compressionLevel) { */ static size_t ZSTDMT_writeLastEmptyBlock(ZSTDMT_jobDescription* job) { - assert(job->srcSize == 0); assert(job->lastChunk == 1); - assert(job->firstChunk == 0); /* first chunk needs to create frame header too */ - assert(job->dstBuff.start == NULL); /* invoked from streaming variant only */ - { buffer_t const dstBuff = ZSTDMT_getBuffer(job->bufPool); - if (dstBuff.start==NULL) return ERROR(memory_allocation); - job->dstBuff = dstBuff; /* will be released by ZSTDMT_flushProduced() */ - assert(dstBuff.size >= ZSTD_blockHeaderSize); - job->cSize = ZSTD_writeLastEmptyBlock(dstBuff.start, dstBuff.size); - assert(!ZSTD_isError(job->cSize)); - assert(job->consumed == 0); - } + assert(job->srcSize == 0); /* last chunk is empty -> will be simplified into a last empty block */ + assert(job->firstChunk == 0); /* cannot be first chunk, as it also needs to create frame header */ + /* A job created by streaming variant starts with a src buffer, but no dst buffer. + * It summons a dstBuffer itself, compresses into it, then releases srcBuffer, and gives result to mtctx. + * When done, srcBuffer is empty, while dstBuffer is filled, and will be released by mtctx. + * This shortcut will simply switch srcBuffer for dstBuffer, providing same outcome as a normal job */ + assert(job->dstBuff.start == NULL); /* invoked from streaming variant only (otherwise, dstBuff might be user's output) */ + assert(job->srcBuff.start != NULL); /* invoked from streaming variant only (otherwise, srcBuff might be user's input) */ + assert(job->srcBuff.size >= ZSTD_blockHeaderSize); + job->dstBuff = job->srcBuff; + job->srcBuff = g_nullBuffer; + job->cSize = ZSTD_writeLastEmptyBlock(job->dstBuff.start, job->dstBuff.size); + assert(!ZSTD_isError(job->cSize)); + assert(job->consumed == 0); return 0; } @@ -1028,12 +1031,12 @@ static size_t ZSTDMT_createCompressionJob(ZSTDMT_CCtx* mtctx, size_t srcSize, ZS if (!mtctx->jobReady) { DEBUGLOG(5, "ZSTDMT_createCompressionJob: preparing job %u to compress %u bytes with %u preload ", mtctx->nextJobID, (U32)srcSize, (U32)mtctx->prefixSize); - mtctx->jobs[jobID].src = mtctx->inBuff.buffer; - mtctx->jobs[jobID].srcStart = mtctx->inBuff.buffer.start; + mtctx->jobs[jobID].srcBuff = mtctx->inBuff.buffer; + mtctx->jobs[jobID].prefixStart = mtctx->inBuff.buffer.start; + mtctx->jobs[jobID].prefixSize = mtctx->prefixSize; mtctx->jobs[jobID].srcSize = srcSize; mtctx->jobs[jobID].consumed = 0; mtctx->jobs[jobID].cSize = 0; - mtctx->jobs[jobID].prefixSize = mtctx->prefixSize; assert(mtctx->inBuff.filled >= srcSize + mtctx->prefixSize); mtctx->jobs[jobID].params = mtctx->params; /* do not calculate checksum within sections, but write it in header for first section */ @@ -1066,7 +1069,7 @@ static size_t ZSTDMT_createCompressionJob(ZSTDMT_CCtx* mtctx, size_t srcSize, ZS } mtctx->inBuff.filled -= srcSize + mtctx->prefixSize - newPrefixSize; memmove(mtctx->inBuff.buffer.start, /* copy end of current job into next job, as "prefix" */ - (const char*)mtctx->jobs[jobID].srcStart + mtctx->prefixSize + srcSize - newPrefixSize, + (const char*)mtctx->jobs[jobID].prefixStart + mtctx->prefixSize + srcSize - newPrefixSize, mtctx->inBuff.filled); mtctx->prefixSize = newPrefixSize; } else { /* endFrame==1 => no need for another input buffer */