From: Yann Collet Date: Sat, 20 Jan 2018 01:35:08 +0000 (-0800) Subject: zstdmt : finally vanquished an elusive and rare race condition X-Git-Tag: v1.3.4~1^2~67^2~23 X-Git-Url: http://git.ipfire.org/cgi-bin/gitweb.cgi?a=commitdiff_plain;h=3ad7d4951c9c54b9709935ad54c9fb4e81b940f2;p=thirdparty%2Fzstd.git zstdmt : finally vanquished an elusive and rare race condition --- diff --git a/lib/compress/zstdmt_compress.c b/lib/compress/zstdmt_compress.c index 5b253f162..3e665f574 100644 --- a/lib/compress/zstdmt_compress.c +++ b/lib/compress/zstdmt_compress.c @@ -1003,9 +1003,11 @@ static size_t ZSTDMT_createCompressionJob(ZSTDMT_CCtx* zcs, size_t srcSize, unsi { unsigned const jobID = zcs->nextJobID & zcs->jobIDMask; - unsigned const limitID = zcs->doneJobID & zcs->jobIDMask; - if ((zcs->doneJobID < zcs->nextJobID) & (jobID == limitID)) - return 0; /* new job would overwrite unflushed older job */ + if (zcs->nextJobID > zcs->doneJobID + zcs->jobIDMask) { + DEBUGLOG(5, "ZSTDMT_createCompressionJob: will not create new job : table is full"); + assert((zcs->nextJobID & zcs->jobIDMask) == (zcs->doneJobID & zcs->jobIDMask)); + return 0; + } if (!zcs->jobReady) { DEBUGLOG(5, "ZSTDMT_createCompressionJob: preparing job %u to compress %u bytes with %u preload ", @@ -1079,76 +1081,78 @@ static size_t ZSTDMT_createCompressionJob(ZSTDMT_CCtx* zcs, size_t srcSize, unsi } -/*! ZSTDMT_flushNextJob() : - * `output` : will be updated with amount of data flushed . +/*! ZSTDMT_flushProduced() : + * `output` : `pos` will be updated with amount of data flushed . * `blockToFlush` : if >0, the function will block and wait if there is no data available to flush . * @return : amount of data remaining within internal buffer, 0 if no more, 1 if unknown but > 0, or an error code */ -static size_t ZSTDMT_flushNextJob(ZSTDMT_CCtx* zcs, ZSTD_outBuffer* output, unsigned blockToFlush) +static size_t ZSTDMT_flushProduced(ZSTDMT_CCtx* zcs, ZSTD_outBuffer* output, unsigned blockToFlush) { unsigned const wJobID = zcs->doneJobID & zcs->jobIDMask; - DEBUGLOG(5, "ZSTDMT_flushNextJob (blocking:%u)", blockToFlush); - if (zcs->doneJobID == zcs->nextJobID) { - DEBUGLOG(5, "ZSTDMT_flushNextJob: doneJobID(%u)==(%u)nextJobID : nothing to flush !", - zcs->doneJobID, zcs->nextJobID) - return 0; /* all flushed ! */ - } + DEBUGLOG(5, "ZSTDMT_flushProduced (blocking:%u)", blockToFlush); + assert(output->size >= output->pos); + ZSTD_PTHREAD_MUTEX_LOCK(&zcs->jobCompleted_mutex); - while (zcs->jobs[wJobID].dstFlushed == zcs->jobs[wJobID].cSize) { - if (!blockToFlush) { ZSTD_pthread_mutex_unlock(&zcs->jobCompleted_mutex); return 0; } /* nothing ready to be flushed => skip */ - if (zcs->jobs[wJobID].jobCompleted==1) break; - DEBUGLOG(5, "waiting for something to flush from job %u (currently flushed: %u bytes)", - zcs->doneJobID, (U32)zcs->jobs[wJobID].dstFlushed); - ZSTD_pthread_cond_wait(&zcs->jobCompleted_cond, &zcs->jobCompleted_mutex); /* block when nothing available to flush but more to come */ - } + if (blockToFlush && (zcs->doneJobID < zcs->nextJobID)) { + while (zcs->jobs[wJobID].dstFlushed == zcs->jobs[wJobID].cSize) { + if (zcs->jobs[wJobID].jobCompleted==1) break; + DEBUGLOG(5, "waiting for something to flush from job %u (currently flushed: %u bytes)", + zcs->doneJobID, (U32)zcs->jobs[wJobID].dstFlushed); + ZSTD_pthread_cond_wait(&zcs->jobCompleted_cond, &zcs->jobCompleted_mutex); /* block when nothing available to flush but more to come */ + } } /* some output is available to be flushed */ { ZSTDMT_jobDescription job = zcs->jobs[wJobID]; ZSTD_pthread_mutex_unlock(&zcs->jobCompleted_mutex); if (ZSTD_isError(job.cSize)) { - DEBUGLOG(5, "ZSTDMT_flushNextJob: job %u : compression error detected : %s", + DEBUGLOG(5, "ZSTDMT_flushProduced: job %u : compression error detected : %s", zcs->doneJobID, ZSTD_getErrorName(job.cSize)); ZSTDMT_waitForAllJobsCompleted(zcs); ZSTDMT_releaseAllJobResources(zcs); return job.cSize; } - /* add frame checksum if necessary */ + /* add frame checksum if necessary (can only happen once) */ if ( job.jobCompleted && job.frameChecksumNeeded ) { U32 const checksum = (U32)XXH64_digest(&zcs->xxhState); - DEBUGLOG(5, "ZSTDMT_flushNextJob: writing checksum : %08X \n", checksum); + DEBUGLOG(5, "ZSTDMT_flushProduced: writing checksum : %08X \n", checksum); MEM_writeLE32((char*)job.dstBuff.start + job.cSize, checksum); job.cSize += 4; zcs->jobs[wJobID].cSize += 4; zcs->jobs[wJobID].frameChecksumNeeded = 0; } assert(job.cSize >= job.dstFlushed); - { size_t const toWrite = MIN(job.cSize - job.dstFlushed, output->size - output->pos); - DEBUGLOG(5, "ZSTDMT_flushNextJob: Flushing %u bytes from job %u (completion:%.1f%%)", + if (job.dstBuff.start != NULL) { /* one buffer present : some job is ongoing */ + size_t const toWrite = MIN(job.cSize - job.dstFlushed, output->size - output->pos); + DEBUGLOG(5, "ZSTDMT_flushProduced: Flushing %u bytes from job %u (completion:%.1f%%)", (U32)toWrite, zcs->doneJobID, (double)job.consumed / job.srcSize * 100); memcpy((char*)output->dst + output->pos, (const char*)job.dstBuff.start + job.dstFlushed, toWrite); output->pos += toWrite; job.dstFlushed += toWrite; - } - if ( job.jobCompleted - && (job.dstFlushed == job.cSize) ) { /* output buffer fully flushed => move to next one */ - DEBUGLOG(5, "Job %u completed (%u bytes), moving to next one", - zcs->doneJobID, (U32)job.dstFlushed); - ZSTDMT_releaseBuffer(zcs->bufPool, job.dstBuff); - zcs->jobs[wJobID].dstBuff = g_nullBuffer; - zcs->jobs[wJobID].jobCompleted = 0; - zcs->consumed += job.srcSize; - zcs->produced += job.cSize; - zcs->doneJobID++; - } else { - zcs->jobs[wJobID].dstFlushed = job.dstFlushed; - } + + if ( job.jobCompleted + && (job.dstFlushed == job.cSize) ) { /* output buffer fully flushed => move to next one */ + DEBUGLOG(5, "Job %u completed (%u bytes), moving to next one", + zcs->doneJobID, (U32)job.dstFlushed); + ZSTDMT_releaseBuffer(zcs->bufPool, job.dstBuff); + zcs->jobs[wJobID].dstBuff = g_nullBuffer; + zcs->jobs[wJobID].jobCompleted = 0; + zcs->consumed += job.srcSize; + zcs->produced += job.cSize; + zcs->doneJobID++; + } else { + zcs->jobs[wJobID].dstFlushed = job.dstFlushed; /* remember how much was flushed for next attempt */ + } } + /* return value : how many bytes left in buffer ; fake it to 1 when unknown but >0 */ if (job.cSize > job.dstFlushed) return (job.cSize - job.dstFlushed); - if (zcs->doneJobID < zcs->nextJobID) return 1; /* still some more buffer to flush */ - if (zcs->jobReady) return 1; /* some more work to do ! */ - zcs->allJobsCompleted = zcs->frameEnded; /* last frame entirely flushed */ - return 0; /* everything flushed */ -} } + if (job.srcSize > job.consumed) return 1; /* current job not completely compressed */ + } + if (zcs->doneJobID < zcs->nextJobID) return 1; /* some more jobs to flush */ + if (zcs->jobReady) return 1; /* at least one more job to do ! */ + if (zcs->inBuff.filled > 0) return 1; /* input not empty */ + zcs->allJobsCompleted = zcs->frameEnded; /* last frame entirely flushed */ + return 0; /* everything flushed */ +} /** ZSTDMT_compressStream_generic() : @@ -1220,7 +1224,7 @@ size_t ZSTDMT_compressStream_generic(ZSTDMT_CCtx* mtctx, } /* check for potential compressed data ready to be flushed */ - { size_t const remainingToFlush = ZSTDMT_flushNextJob(mtctx, output, !forwardInputProgress); /* block if there was no forward input progress */ + { size_t const remainingToFlush = ZSTDMT_flushProduced(mtctx, output, !forwardInputProgress); /* block if there was no forward input progress */ if (input->pos < input->size) return MAX(remainingToFlush, 1); /* input not consumed : do not flush yet */ return remainingToFlush; } @@ -1244,12 +1248,13 @@ static size_t ZSTDMT_flushStream_internal(ZSTDMT_CCtx* mtctx, ZSTD_outBuffer* ou if ( mtctx->jobReady /* one job ready for a worker to pick up */ || (srcSize > 0) /* still some data within input buffer */ || (endFrame && !mtctx->frameEnded)) { /* need a last 0-size block to end frame */ - DEBUGLOG(5, "ZSTDMT_flushStream_internal : create a new job"); + DEBUGLOG(5, "ZSTDMT_flushStream_internal : create a new job (%u bytes, end:%u)", + (U32)srcSize, endFrame); CHECK_F( ZSTDMT_createCompressionJob(mtctx, srcSize, endFrame) ); } /* check if there is any data available to flush */ - return ZSTDMT_flushNextJob(mtctx, output, 1 /* blockToFlush */); + return ZSTDMT_flushProduced(mtctx, output, 1 /* blockToFlush */); } diff --git a/tests/zstreamtest.c b/tests/zstreamtest.c index 6ec14bb8e..8c66c53bd 100644 --- a/tests/zstreamtest.c +++ b/tests/zstreamtest.c @@ -99,8 +99,8 @@ unsigned int FUZ_rand(unsigned int* seedPtr) if (cond) { \ DISPLAY("Error => "); \ DISPLAY(__VA_ARGS__); \ - DISPLAY(" (seed %u, test nb %u, line %u) \n", \ - seed, testNb, __LINE__); \ + DISPLAY(" (seed %u, test nb %u, line %u (sig %08X) \n", \ + seed, testNb, __LINE__, coreSeed); \ goto _output_error; \ } } @@ -219,6 +219,7 @@ static int basicUnitTests(U32 seed, double compressibility) size_t cSize; int testResult = 0; U32 testNb = 1; + U32 coreSeed = 0; /* just to conform with CHECK_Z macro display */ ZSTD_CStream* zc = ZSTD_createCStream(); ZSTD_DStream* zd = ZSTD_createDStream(); ZSTDMT_CCtx* mtctx = ZSTDMT_createCCtx(2); @@ -958,10 +959,13 @@ static int fuzzerTests(U32 seed, U32 nbTests, unsigned startTest, double compres size_t maxTestSize; /* init */ - if (nbTests >= testNb) { DISPLAYUPDATE(2, "\r%6u/%6u ", testNb, nbTests); } - else { DISPLAYUPDATE(2, "\r%6u ", testNb); } FUZ_rand(&coreSeed); lseed = coreSeed ^ prime32; + if (nbTests >= testNb) { + DISPLAYUPDATE(2, "\r%6u/%6u (%08X) ", testNb, nbTests, lseed); + } else { + DISPLAYUPDATE(2, "\r%6u (%08X) ", testNb, lseed); + } /* states full reset (deliberately not synchronized) */ /* some issues can only happen when reusing states */ @@ -1171,7 +1175,6 @@ static int fuzzerTests_MT(U32 seed, U32 nbTests, unsigned startTest, double comp UTIL_time_t const startClock = UTIL_getTime(); const BYTE* dict=NULL; /* can keep same dict on 2 consecutive tests */ size_t dictSize = 0; - U32 oldTestLog = 0; int const cLevelMax = bigTests ? (U32)ZSTD_maxCLevel()-1 : g_cLevelMax_smallTests; U32 const nbThreadsMax = bigTests ? 4 : 2; @@ -1193,6 +1196,7 @@ static int fuzzerTests_MT(U32 seed, U32 nbTests, unsigned startTest, double comp RDG_genBuffer(cNoiseBuffer[4], srcBufferSize, 1.00, 0., coreSeed); /* sparse content */ memset(copyBuffer, 0x65, copyBufferSize); /* make copyBuffer considered initialized */ ZSTD_initDStream_usingDict(zd, NULL, 0); /* ensure at least one init */ + DISPLAYLEVEL(6, "Creating initial context with %u threads \n", nbThreads); /* catch up testNb */ for (testNb=1; testNb < startTest; testNb++) @@ -1205,14 +1209,14 @@ static int fuzzerTests_MT(U32 seed, U32 nbTests, unsigned startTest, double comp size_t totalTestSize, totalGenSize, cSize; XXH64_state_t xxhState; U64 crcOrig; - U32 resetAllowed = 1; size_t maxTestSize; - /* init */ - if (testNb < nbTests) { - DISPLAYUPDATE(2, "\r%6u/%6u ", testNb, nbTests); - } else { DISPLAYUPDATE(2, "\r%6u ", testNb); } FUZ_rand(&coreSeed); + if (nbTests >= testNb) { + DISPLAYUPDATE(2, "\r%6u/%6u (%08X) ", testNb, nbTests, coreSeed); + } else { + DISPLAYUPDATE(2, "\r%6u (%08X) ", testNb, coreSeed); + } lseed = coreSeed ^ prime32; /* states full reset (deliberately not synchronized) */ @@ -1223,7 +1227,6 @@ static int fuzzerTests_MT(U32 seed, U32 nbTests, unsigned startTest, double comp ZSTDMT_freeCCtx(zc); zc = ZSTDMT_createCCtx(nbThreads); CHECK(zc==NULL, "ZSTDMT_createCCtx allocation error") - resetAllowed=0; } if ((FUZ_rand(&lseed) & 0xFF) == 132) { ZSTD_freeDStream(zd); @@ -1248,16 +1251,7 @@ static int fuzzerTests_MT(U32 seed, U32 nbTests, unsigned startTest, double comp } /* compression init */ - if ((FUZ_rand(&lseed)&1) /* at beginning, to keep same nb of rand */ - && oldTestLog /* at least one test happened */ && resetAllowed) { - maxTestSize = FUZ_randomLength(&lseed, oldTestLog+2); - if (maxTestSize >= srcBufferSize) maxTestSize = srcBufferSize-1; - { int const compressionLevel = (FUZ_rand(&lseed) % 5) + 1; - DISPLAYLEVEL(5, "Init with compression level = %i \n", compressionLevel); - CHECK_Z( ZSTDMT_initCStream(zc, compressionLevel) ); - } - } else { - U32 const testLog = FUZ_rand(&lseed) % maxSrcLog; + { U32 const testLog = FUZ_rand(&lseed) % maxSrcLog; U32 const dictLog = FUZ_rand(&lseed) % maxSrcLog; int const cLevelCandidate = ( FUZ_rand(&lseed) % (ZSTD_maxCLevel() - (MAX(testLog, dictLog) / 2)) ) @@ -1266,24 +1260,29 @@ static int fuzzerTests_MT(U32 seed, U32 nbTests, unsigned startTest, double comp int const cLevelMin = MAX(cLevelThreadAdjusted, 1); /* no negative cLevel yet */ int const cLevel = MIN(cLevelMin, cLevelMax); maxTestSize = FUZ_rLogLength(&lseed, testLog); - oldTestLog = testLog; - /* random dictionary selection */ - dictSize = ((FUZ_rand(&lseed)&63)==1) ? FUZ_rLogLength(&lseed, dictLog) : 0; - { size_t const dictStart = FUZ_rand(&lseed) % (srcBufferSize - dictSize); - dict = srcBuffer + dictStart; - } - { U64 const pledgedSrcSize = (FUZ_rand(&lseed) & 3) ? ZSTD_CONTENTSIZE_UNKNOWN : maxTestSize; - ZSTD_parameters params = ZSTD_getParams(cLevel, pledgedSrcSize, dictSize); - DISPLAYLEVEL(5, "Init with windowLog = %u, pledgedSrcSize = %u, dictSize = %u \n", - params.cParams.windowLog, (U32)pledgedSrcSize, (U32)dictSize); - params.fParams.checksumFlag = FUZ_rand(&lseed) & 1; - params.fParams.noDictIDFlag = FUZ_rand(&lseed) & 1; - params.fParams.contentSizeFlag = FUZ_rand(&lseed) & 1; - DISPLAYLEVEL(5, "checksumFlag : %u \n", params.fParams.checksumFlag); - CHECK_Z( ZSTDMT_setMTCtxParameter(zc, ZSTDMT_p_overlapSectionLog, FUZ_rand(&lseed) % 12) ); - CHECK_Z( ZSTDMT_setMTCtxParameter(zc, ZSTDMT_p_jobSize, FUZ_rand(&lseed) % (2*maxTestSize+1)) ); /* custome job size */ - CHECK_Z( ZSTDMT_initCStream_advanced(zc, dict, dictSize, params, pledgedSrcSize) ); - } } + + if (FUZ_rand(&lseed)&1) { /* simple init */ + int const compressionLevel = (FUZ_rand(&lseed) % 5) + 1; + DISPLAYLEVEL(5, "Init with compression level = %i \n", compressionLevel); + CHECK_Z( ZSTDMT_initCStream(zc, compressionLevel) ); + } else { /* advanced init */ + /* random dictionary selection */ + dictSize = ((FUZ_rand(&lseed)&63)==1) ? FUZ_rLogLength(&lseed, dictLog) : 0; + { size_t const dictStart = FUZ_rand(&lseed) % (srcBufferSize - dictSize); + dict = srcBuffer + dictStart; + } + { U64 const pledgedSrcSize = (FUZ_rand(&lseed) & 3) ? ZSTD_CONTENTSIZE_UNKNOWN : maxTestSize; + ZSTD_parameters params = ZSTD_getParams(cLevel, pledgedSrcSize, dictSize); + DISPLAYLEVEL(5, "Init with windowLog = %u, pledgedSrcSize = %u, dictSize = %u \n", + params.cParams.windowLog, (U32)pledgedSrcSize, (U32)dictSize); + params.fParams.checksumFlag = FUZ_rand(&lseed) & 1; + params.fParams.noDictIDFlag = FUZ_rand(&lseed) & 1; + params.fParams.contentSizeFlag = FUZ_rand(&lseed) & 1; + DISPLAYLEVEL(5, "checksumFlag : %u \n", params.fParams.checksumFlag); + CHECK_Z( ZSTDMT_setMTCtxParameter(zc, ZSTDMT_p_overlapSectionLog, FUZ_rand(&lseed) % 12) ); + CHECK_Z( ZSTDMT_setMTCtxParameter(zc, ZSTDMT_p_jobSize, FUZ_rand(&lseed) % (2*maxTestSize+1)) ); /* custome job size */ + CHECK_Z( ZSTDMT_initCStream_advanced(zc, dict, dictSize, params, pledgedSrcSize) ); + } } } /* multi-segments compression test */ XXH64_reset(&xxhState, 0); @@ -1336,10 +1335,13 @@ static int fuzzerTests_MT(U32 seed, U32 nbTests, unsigned startTest, double comp } } crcOrig = XXH64_digest(&xxhState); cSize = outBuff.pos; - DISPLAYLEVEL(5, "Frame completed : %u bytes \n", (U32)cSize); + DISPLAYLEVEL(5, "Frame completed : %u bytes compressed into %u bytes \n", + (U32)totalTestSize, (U32)cSize); } /* multi - fragments decompression test */ + assert(totalTestSize < dstBufferSize); + memset(dstBuffer, 170, totalTestSize); /* init dest area */ if (!dictSize /* don't reset if dictionary : could be different */ && (FUZ_rand(&lseed) & 1)) { CHECK_Z( ZSTD_resetDStream(zd) ); } else { @@ -1354,14 +1356,16 @@ static int fuzzerTests_MT(U32 seed, U32 nbTests, unsigned startTest, double comp size_t const dstBuffSize = MIN(dstBufferSize - totalGenSize, randomDstSize); inBuff.size = inBuff.pos + readCSrcSize; outBuff.size = outBuff.pos + dstBuffSize; - DISPLAYLEVEL(6, "ZSTD_decompressStream input %u bytes \n", (U32)readCSrcSize); + DISPLAYLEVEL(6, "ZSTD_decompressStream input %u bytes into outBuff %u bytes \n", + (U32)readCSrcSize, (U32)dstBuffSize); decompressionResult = ZSTD_decompressStream(zd, &outBuff, &inBuff); if (ZSTD_isError(decompressionResult)) { DISPLAY("ZSTD_decompressStream error : %s \n", ZSTD_getErrorName(decompressionResult)); findDiff(copyBuffer, dstBuffer, totalTestSize); } CHECK (ZSTD_isError(decompressionResult), "decompression error : %s", ZSTD_getErrorName(decompressionResult)); - DISPLAYLEVEL(6, "total ingested (inBuff.pos) = %u \n", (U32)inBuff.pos); + DISPLAYLEVEL(6, "total ingested (inBuff.pos) = %u and produced (outBuff.pos) = %u \n", + (U32)inBuff.pos, (U32)outBuff.pos); } CHECK (outBuff.pos != totalTestSize, "decompressed data : wrong size (%u != %u)", (U32)outBuff.pos, (U32)totalTestSize); CHECK (inBuff.pos != cSize, "compressed data should be fully read (%u != %u)", (U32)inBuff.pos, (U32)cSize);