From: Yann Collet Date: Fri, 26 Jan 2018 22:35:54 +0000 (-0800) Subject: zstdmt: job table correctly cleaned after synchronous ZSTDMT_compress() X-Git-Tag: v1.3.4~1^2~67^2~7 X-Git-Url: http://git.ipfire.org/cgi-bin/gitweb.cgi?a=commitdiff_plain;h=27c5853c4285fa678fb9f606a87d21852b533ac0;p=thirdparty%2Fzstd.git zstdmt: job table correctly cleaned after synchronous ZSTDMT_compress() --- diff --git a/lib/compress/zstdmt_compress.c b/lib/compress/zstdmt_compress.c index 3bf585d49..d78257b2e 100644 --- a/lib/compress/zstdmt_compress.c +++ b/lib/compress/zstdmt_compress.c @@ -559,6 +559,7 @@ static void ZSTDMT_releaseAllJobResources(ZSTDMT_CCtx* mtctx) DEBUGLOG(4, "job%02u: release dst address %08X", jobID, (U32)(size_t)mtctx->jobs[jobID].dstBuff.start); ZSTDMT_releaseBuffer(mtctx->bufPool, mtctx->jobs[jobID].dstBuff); mtctx->jobs[jobID].dstBuff = g_nullBuffer; + mtctx->jobs[jobID].cSize = 0; DEBUGLOG(4, "job%02u: release src address %08X", jobID, (U32)(size_t)mtctx->jobs[jobID].srcBuff.start); ZSTDMT_releaseBuffer(mtctx->bufPool, mtctx->jobs[jobID].srcBuff); mtctx->jobs[jobID].srcBuff = g_nullBuffer; @@ -816,6 +817,7 @@ static size_t ZSTDMT_compress_advanced_internal( ZSTDMT_releaseBuffer(mtctx->bufPool, mtctx->jobs[chunkID].dstBuff); } } mtctx->jobs[chunkID].dstBuff = g_nullBuffer; + mtctx->jobs[chunkID].cSize = 0; dstPos += cSize ; } } /* for (chunkID=0; chunkIDjobs[wJobID].cSize += 4; /* can write this shared value, as worker is no longer active */ mtctx->jobs[wJobID].frameChecksumNeeded = 0; } - if (cSize > 0) { /* compression is ongoing or completed */ + if (cSize > 0) { /* compression is ongoing or completed */ size_t const toWrite = MIN(cSize - mtctx->jobs[wJobID].dstFlushed, output->size - output->pos); - DEBUGLOG(5, "ZSTDMT_flushProduced: Flushing %u bytes from job %u (completion:%u/%u)", - (U32)toWrite, mtctx->doneJobID, (U32)srcConsumed, (U32)mtctx->jobs[wJobID].srcSize); + DEBUGLOG(5, "ZSTDMT_flushProduced: Flushing %u bytes from job %u (completion:%u/%u, generated:%u)", + (U32)toWrite, mtctx->doneJobID, (U32)srcConsumed, (U32)mtctx->jobs[wJobID].srcSize, (U32)cSize); + assert(mtctx->doneJobID < mtctx->nextJobID); assert(cSize >= mtctx->jobs[wJobID].dstFlushed); + assert(mtctx->jobs[wJobID].dstBuff.start != NULL); memcpy((char*)output->dst + output->pos, (const char*)mtctx->jobs[wJobID].dstBuff.start + mtctx->jobs[wJobID].dstFlushed, toWrite); output->pos += toWrite; mtctx->jobs[wJobID].dstFlushed += toWrite; /* can write : this value is only used by mtctx */ @@ -1204,7 +1208,8 @@ size_t ZSTDMT_compressStream_generic(ZSTDMT_CCtx* mtctx, { size_t const newJobThreshold = mtctx->inBuff.prefixSize + mtctx->targetSectionSize; unsigned forwardInputProgress = 0; - DEBUGLOG(5, "ZSTDMT_compressStream_generic (endOp=%u)", (U32)endOp); + DEBUGLOG(5, "ZSTDMT_compressStream_generic (endOp=%u, srcSize=%u)", + (U32)endOp, (U32)(input->size - input->pos)); assert(output->pos <= output->size); assert(input->pos <= input->size);