From: Yann Collet Date: Fri, 19 Jan 2018 00:20:26 +0000 (-0800) Subject: fixed frame checksum issue X-Git-Tag: v1.3.4~1^2~67^2~27 X-Git-Url: http://git.ipfire.org/cgi-bin/gitweb.cgi?a=commitdiff_plain;h=6f7280fb33f5717f2f30e2c19b57512a96325bd8;p=thirdparty%2Fzstd.git fixed frame checksum issue and race conditions --- diff --git a/lib/compress/zstd_compress.c b/lib/compress/zstd_compress.c index e8840be97..a87c368ea 100644 --- a/lib/compress/zstd_compress.c +++ b/lib/compress/zstd_compress.c @@ -2396,7 +2396,7 @@ static size_t ZSTD_writeEpilogue(ZSTD_CCtx* cctx, void* dst, size_t dstCapacity) BYTE* op = ostart; size_t fhSize = 0; - DEBUGLOG(5, "ZSTD_writeEpilogue"); + DEBUGLOG(4, "ZSTD_writeEpilogue"); if (cctx->stage == ZSTDcs_created) return ERROR(stage_wrong); /* init missing */ /* special case : empty frame */ @@ -2420,6 +2420,7 @@ static size_t ZSTD_writeEpilogue(ZSTD_CCtx* cctx, void* dst, size_t dstCapacity) if (cctx->appliedParams.fParams.checksumFlag) { U32 const checksum = (U32) XXH64_digest(&cctx->xxhState); if (dstCapacity<4) return ERROR(dstSize_tooSmall); + DEBUGLOG(4, "ZSTD_writeEpilogue: write checksum : %08X", checksum); MEM_writeLE32(op, checksum); op += 4; } diff --git a/lib/compress/zstdmt_compress.c b/lib/compress/zstdmt_compress.c index ffcbdf5f8..1aa6b866f 100644 --- a/lib/compress/zstdmt_compress.c +++ b/lib/compress/zstdmt_compress.c @@ -315,7 +315,7 @@ typedef struct { unsigned firstChunk; unsigned lastChunk; unsigned jobCompleted; - unsigned checksumWritten; + unsigned frameChecksumNeeded; ZSTD_pthread_mutex_t* jobCompleted_mutex; ZSTD_pthread_cond_t* jobCompleted_cond; ZSTD_CCtx_params params; @@ -1019,7 +1019,7 @@ static size_t ZSTDMT_createCompressionJob(ZSTDMT_CCtx* zcs, size_t srcSize, unsi zcs->jobs[jobID].firstChunk = (zcs->nextJobID==0); zcs->jobs[jobID].lastChunk = endFrame; zcs->jobs[jobID].jobCompleted = 0; - zcs->jobs[jobID].checksumWritten = 0; + zcs->jobs[jobID].frameChecksumNeeded = endFrame && (zcs->nextJobID>0) && zcs->params.fParams.checksumFlag; zcs->jobs[jobID].dstFlushed = 0; zcs->jobs[jobID].jobCompleted_mutex = &zcs->jobCompleted_mutex; zcs->jobs[jobID].jobCompleted_cond = &zcs->jobCompleted_cond; @@ -1065,22 +1065,25 @@ static size_t ZSTDMT_createCompressionJob(ZSTDMT_CCtx* zcs, size_t srcSize, unsi } -/* ZSTDMT_flushNextJob() : - * output : will be updated with amount of data flushed . - * blockToFlush : if >0, the function will block and wait if there is no data available to flush . - * @return : amount of data remaining within internal buffer, 1 if unknown but > 0, 0 if no more, or an error code */ +/*! ZSTDMT_flushNextJob() : + * `output` : will be updated with amount of data flushed . + * `blockToFlush` : if >0, the function will block and wait if there is no data available to flush . + * @return : amount of data remaining within internal buffer, 0 if no more, 1 if unknown but > 0, or an error code */ static size_t ZSTDMT_flushNextJob(ZSTDMT_CCtx* zcs, ZSTD_outBuffer* output, unsigned blockToFlush) { unsigned const wJobID = zcs->doneJobID & zcs->jobIDMask; DEBUGLOG(2, "ZSTDMT_flushNextJob (blocking:%u)", blockToFlush); - if (zcs->doneJobID == zcs->nextJobID) return 0; /* all flushed ! */ + if (zcs->doneJobID == zcs->nextJobID) { + DEBUGLOG(2, "ZSTDMT_flushNextJob: doneJobID==nextJobID : nothing to flush !") + return 0; /* all flushed ! */ + } ZSTD_PTHREAD_MUTEX_LOCK(&zcs->jobCompleted_mutex); while (zcs->jobs[wJobID].dstFlushed == zcs->jobs[wJobID].cSize) { + if (!blockToFlush) { ZSTD_pthread_mutex_unlock(&zcs->jobCompleted_mutex); return 0; } /* nothing ready to be flushed => skip */ + if (zcs->jobs[wJobID].jobCompleted==1) break; DEBUGLOG(2, "waiting for something to flush from job %u (currently flushed: %u bytes)", zcs->doneJobID, (U32)zcs->jobs[wJobID].dstFlushed); - assert(zcs->jobs[wJobID].jobCompleted==0); - if (!blockToFlush) { ZSTD_pthread_mutex_unlock(&zcs->jobCompleted_mutex); return 0; } /* nothing ready to be flushed => skip */ - ZSTD_pthread_cond_wait(&zcs->jobCompleted_cond, &zcs->jobCompleted_mutex); /* block when nothing available to flush */ + ZSTD_pthread_cond_wait(&zcs->jobCompleted_cond, &zcs->jobCompleted_mutex); /* block when nothing available to flush but more to come */ } /* some output is available to be flushed */ @@ -1094,16 +1097,14 @@ static size_t ZSTDMT_flushNextJob(ZSTDMT_CCtx* zcs, ZSTD_outBuffer* output, unsi return job.cSize; } /* add frame checksum if necessary */ - if ( zcs->frameEnded - && (zcs->doneJobID+1 == zcs->nextJobID) - && (zcs->params.fParams.checksumFlag) - && (!job.checksumWritten) ) { + if ( job.jobCompleted + && job.frameChecksumNeeded ) { U32 const checksum = (U32)XXH64_digest(&zcs->xxhState); DEBUGLOG(5, "ZSTDMT_flushNextJob: writing checksum : %08X \n", checksum); MEM_writeLE32((char*)job.dstBuff.start + job.cSize, checksum); job.cSize += 4; zcs->jobs[wJobID].cSize += 4; - zcs->jobs[wJobID].checksumWritten = 1; + zcs->jobs[wJobID].frameChecksumNeeded = 0; } assert(job.cSize >= job.dstFlushed); { size_t const toWrite = MIN(job.cSize - job.dstFlushed, output->size - output->pos); @@ -1114,19 +1115,20 @@ static size_t ZSTDMT_flushNextJob(ZSTDMT_CCtx* zcs, ZSTD_outBuffer* output, unsi } if ( job.jobCompleted && (job.dstFlushed == job.cSize) ) { /* output buffer fully flushed => move to next one */ + DEBUGLOG(2, "Job %u completed, moving to next one", zcs->doneJobID); ZSTDMT_releaseBuffer(zcs->bufPool, job.dstBuff); zcs->jobs[wJobID].dstBuff = g_nullBuffer; zcs->jobs[wJobID].jobCompleted = 0; - zcs->doneJobID++; zcs->consumed += job.srcSize; zcs->produced += job.cSize; + zcs->doneJobID++; } else { zcs->jobs[wJobID].dstFlushed = job.dstFlushed; } - /* return value : how many bytes left in buffer ; fake it to 1 if unknown but >0 */ + /* return value : how many bytes left in buffer ; fake it to 1 when unknown but >0 */ if (job.cSize > job.dstFlushed) return (job.cSize - job.dstFlushed); - if (zcs->doneJobID < zcs->nextJobID) return 1; /* still some buffer to flush */ - zcs->allJobsCompleted = zcs->frameEnded; /* frame completed and entirely flushed */ + if (zcs->doneJobID < zcs->nextJobID) return 1; /* still some more buffer to flush */ + zcs->allJobsCompleted = zcs->frameEnded; /* last frame entirely flushed */ return 0; /* everything flushed */ } } diff --git a/tests/zstreamtest.c b/tests/zstreamtest.c index 5cd1ea0fb..c26cb3295 100644 --- a/tests/zstreamtest.c +++ b/tests/zstreamtest.c @@ -1346,8 +1346,12 @@ static int fuzzerTests_MT(U32 seed, U32 nbTests, unsigned startTest, double comp outBuff.size = outBuff.pos + dstBuffSize; DISPLAYLEVEL(6, "ZSTD_decompressStream input %u bytes \n", (U32)readCSrcSize); decompressionResult = ZSTD_decompressStream(zd, &outBuff, &inBuff); + if (ZSTD_getErrorCode(decompressionResult) == ZSTD_error_corruption_detected) { + DISPLAY("ZSTD_decompressStream: checksum error : \n"); + findDiff(copyBuffer, dstBuffer, totalTestSize); + } CHECK (ZSTD_isError(decompressionResult), "decompression error : %s", ZSTD_getErrorName(decompressionResult)); - DISPLAYLEVEL(6, "inBuff.pos = %u \n", (U32)readCSrcSize); + DISPLAYLEVEL(6, "total ingested (inBuff.pos) = %u \n", (U32)inBuff.pos); } CHECK (outBuff.pos != totalTestSize, "decompressed data : wrong size (%u != %u)", (U32)outBuff.pos, (U32)totalTestSize); CHECK (inBuff.pos != cSize, "compressed data should be fully read (%u != %u)", (U32)inBuff.pos, (U32)cSize);