]> git.ipfire.org Git - thirdparty/zstd.git/commitdiff
zstdmt : finally vanquished an elusive and rare race condition
authorYann Collet <cyan@fb.com>
Sat, 20 Jan 2018 01:35:08 +0000 (17:35 -0800)
committerYann Collet <cyan@fb.com>
Sat, 20 Jan 2018 01:35:08 +0000 (17:35 -0800)
lib/compress/zstdmt_compress.c
tests/zstreamtest.c

index 5b253f162fe1a901aa4c5b027d2803fdc85643cc..3e665f574e391ef125bcf389f6f4bfbb806476ed 100644 (file)
@@ -1003,9 +1003,11 @@ static size_t ZSTDMT_createCompressionJob(ZSTDMT_CCtx* zcs, size_t srcSize, unsi
 {
     unsigned const jobID = zcs->nextJobID & zcs->jobIDMask;
 
-    unsigned const limitID = zcs->doneJobID & zcs->jobIDMask;
-    if ((zcs->doneJobID < zcs->nextJobID) & (jobID == limitID))
-        return 0;  /* new job would overwrite unflushed older job */
+    if (zcs->nextJobID > zcs->doneJobID + zcs->jobIDMask) {
+        DEBUGLOG(5, "ZSTDMT_createCompressionJob: will not create new job : table is full");
+        assert((zcs->nextJobID & zcs->jobIDMask) == (zcs->doneJobID & zcs->jobIDMask));
+        return 0;
+    }
 
     if (!zcs->jobReady) {
         DEBUGLOG(5, "ZSTDMT_createCompressionJob: preparing job %u to compress %u bytes with %u preload ",
@@ -1079,76 +1081,78 @@ static size_t ZSTDMT_createCompressionJob(ZSTDMT_CCtx* zcs, size_t srcSize, unsi
 }
 
 
-/*! ZSTDMT_flushNextJob() :
- * `output` : will be updated with amount of data flushed .
+/*! ZSTDMT_flushProduced() :
+ * `output` : `pos` will be updated with amount of data flushed .
  * `blockToFlush` : if >0, the function will block and wait if there is no data available to flush .
  * @return : amount of data remaining within internal buffer, 0 if no more, 1 if unknown but > 0, or an error code */
-static size_t ZSTDMT_flushNextJob(ZSTDMT_CCtx* zcs, ZSTD_outBuffer* output, unsigned blockToFlush)
+static size_t ZSTDMT_flushProduced(ZSTDMT_CCtx* zcs, ZSTD_outBuffer* output, unsigned blockToFlush)
 {
     unsigned const wJobID = zcs->doneJobID & zcs->jobIDMask;
-    DEBUGLOG(5, "ZSTDMT_flushNextJob (blocking:%u)", blockToFlush);
-    if (zcs->doneJobID == zcs->nextJobID) {
-        DEBUGLOG(5, "ZSTDMT_flushNextJob: doneJobID(%u)==(%u)nextJobID : nothing to flush !",
-                    zcs->doneJobID, zcs->nextJobID)
-        return 0;   /* all flushed ! */
-    }
+    DEBUGLOG(5, "ZSTDMT_flushProduced (blocking:%u)", blockToFlush);
+    assert(output->size >= output->pos);
+
     ZSTD_PTHREAD_MUTEX_LOCK(&zcs->jobCompleted_mutex);
-    while (zcs->jobs[wJobID].dstFlushed == zcs->jobs[wJobID].cSize) {
-        if (!blockToFlush) { ZSTD_pthread_mutex_unlock(&zcs->jobCompleted_mutex); return 0; }  /* nothing ready to be flushed => skip */
-        if (zcs->jobs[wJobID].jobCompleted==1) break;
-        DEBUGLOG(5, "waiting for something to flush from job %u (currently flushed: %u bytes)",
-                    zcs->doneJobID, (U32)zcs->jobs[wJobID].dstFlushed);
-        ZSTD_pthread_cond_wait(&zcs->jobCompleted_cond, &zcs->jobCompleted_mutex);  /* block when nothing available to flush but more to come */
-    }
+    if (blockToFlush && (zcs->doneJobID < zcs->nextJobID)) {
+        while (zcs->jobs[wJobID].dstFlushed == zcs->jobs[wJobID].cSize) {
+            if (zcs->jobs[wJobID].jobCompleted==1) break;
+            DEBUGLOG(5, "waiting for something to flush from job %u (currently flushed: %u bytes)",
+                        zcs->doneJobID, (U32)zcs->jobs[wJobID].dstFlushed);
+            ZSTD_pthread_cond_wait(&zcs->jobCompleted_cond, &zcs->jobCompleted_mutex);  /* block when nothing available to flush but more to come */
+    }   }
 
     /* some output is available to be flushed */
     {   ZSTDMT_jobDescription job = zcs->jobs[wJobID];
         ZSTD_pthread_mutex_unlock(&zcs->jobCompleted_mutex);
         if (ZSTD_isError(job.cSize)) {
-            DEBUGLOG(5, "ZSTDMT_flushNextJob: job %u : compression error detected : %s",
+            DEBUGLOG(5, "ZSTDMT_flushProduced: job %u : compression error detected : %s",
                         zcs->doneJobID, ZSTD_getErrorName(job.cSize));
             ZSTDMT_waitForAllJobsCompleted(zcs);
             ZSTDMT_releaseAllJobResources(zcs);
             return job.cSize;
         }
-        /* add frame checksum if necessary */
+        /* add frame checksum if necessary (can only happen once) */
         if ( job.jobCompleted
           && job.frameChecksumNeeded ) {
             U32 const checksum = (U32)XXH64_digest(&zcs->xxhState);
-            DEBUGLOG(5, "ZSTDMT_flushNextJob: writing checksum : %08X \n", checksum);
+            DEBUGLOG(5, "ZSTDMT_flushProduced: writing checksum : %08X \n", checksum);
             MEM_writeLE32((char*)job.dstBuff.start + job.cSize, checksum);
             job.cSize += 4;
             zcs->jobs[wJobID].cSize += 4;
             zcs->jobs[wJobID].frameChecksumNeeded = 0;
         }
         assert(job.cSize >= job.dstFlushed);
-        {   size_t const toWrite = MIN(job.cSize - job.dstFlushed, output->size - output->pos);
-            DEBUGLOG(5, "ZSTDMT_flushNextJob: Flushing %u bytes from job %u (completion:%.1f%%)",
+        if (job.dstBuff.start != NULL) {  /* one buffer present : some job is ongoing */
+            size_t const toWrite = MIN(job.cSize - job.dstFlushed, output->size - output->pos);
+            DEBUGLOG(5, "ZSTDMT_flushProduced: Flushing %u bytes from job %u (completion:%.1f%%)",
                         (U32)toWrite, zcs->doneJobID, (double)job.consumed / job.srcSize * 100);
             memcpy((char*)output->dst + output->pos, (const char*)job.dstBuff.start + job.dstFlushed, toWrite);
             output->pos += toWrite;
             job.dstFlushed += toWrite;
-        }
-        if ( job.jobCompleted
-          && (job.dstFlushed == job.cSize) ) {   /* output buffer fully flushed => move to next one */
-            DEBUGLOG(5, "Job %u completed (%u bytes), moving to next one",
-                    zcs->doneJobID, (U32)job.dstFlushed);
-            ZSTDMT_releaseBuffer(zcs->bufPool, job.dstBuff);
-            zcs->jobs[wJobID].dstBuff = g_nullBuffer;
-            zcs->jobs[wJobID].jobCompleted = 0;
-            zcs->consumed += job.srcSize;
-            zcs->produced += job.cSize;
-            zcs->doneJobID++;
-        } else {
-            zcs->jobs[wJobID].dstFlushed = job.dstFlushed;
-        }
+
+            if ( job.jobCompleted
+              && (job.dstFlushed == job.cSize) ) {   /* output buffer fully flushed => move to next one */
+                DEBUGLOG(5, "Job %u completed (%u bytes), moving to next one",
+                        zcs->doneJobID, (U32)job.dstFlushed);
+                ZSTDMT_releaseBuffer(zcs->bufPool, job.dstBuff);
+                zcs->jobs[wJobID].dstBuff = g_nullBuffer;
+                zcs->jobs[wJobID].jobCompleted = 0;
+                zcs->consumed += job.srcSize;
+                zcs->produced += job.cSize;
+                zcs->doneJobID++;
+            } else {
+                zcs->jobs[wJobID].dstFlushed = job.dstFlushed;   /* remember how much was flushed for next attempt */
+        }   }
+
         /* return value : how many bytes left in buffer ; fake it to 1 when unknown but >0 */
         if (job.cSize > job.dstFlushed) return (job.cSize - job.dstFlushed);
-        if (zcs->doneJobID < zcs->nextJobID) return 1;   /* still some more buffer to flush */
-        if (zcs->jobReady) return 1;   /* some more work to do ! */
-        zcs->allJobsCompleted = zcs->frameEnded;   /* last frame entirely flushed */
-        return 0;   /* everything flushed */
-}   }
+        if (job.srcSize > job.consumed) return 1;   /* current job not completely compressed */
+    }
+    if (zcs->doneJobID < zcs->nextJobID) return 1;   /* some more jobs to flush */
+    if (zcs->jobReady) return 1;   /* at least one more job to do ! */
+    if (zcs->inBuff.filled > 0) return 1;   /* input not empty */
+    zcs->allJobsCompleted = zcs->frameEnded;   /* last frame entirely flushed */
+    return 0;   /* everything flushed */
+}
 
 
 /** ZSTDMT_compressStream_generic() :
@@ -1220,7 +1224,7 @@ size_t ZSTDMT_compressStream_generic(ZSTDMT_CCtx* mtctx,
     }
 
     /* check for potential compressed data ready to be flushed */
-    {   size_t const remainingToFlush = ZSTDMT_flushNextJob(mtctx, output, !forwardInputProgress); /* block if there was no forward input progress */
+    {   size_t const remainingToFlush = ZSTDMT_flushProduced(mtctx, output, !forwardInputProgress); /* block if there was no forward input progress */
         if (input->pos < input->size) return MAX(remainingToFlush, 1);  /* input not consumed : do not flush yet */
         return remainingToFlush;
     }
@@ -1244,12 +1248,13 @@ static size_t ZSTDMT_flushStream_internal(ZSTDMT_CCtx* mtctx, ZSTD_outBuffer* ou
     if ( mtctx->jobReady     /* one job ready for a worker to pick up */
       || (srcSize > 0)       /* still some data within input buffer */
       || (endFrame && !mtctx->frameEnded)) {  /* need a last 0-size block to end frame */
-           DEBUGLOG(5, "ZSTDMT_flushStream_internal : create a new job");
+           DEBUGLOG(5, "ZSTDMT_flushStream_internal : create a new job (%u bytes, end:%u)",
+                        (U32)srcSize, endFrame);
         CHECK_F( ZSTDMT_createCompressionJob(mtctx, srcSize, endFrame) );
     }
 
     /* check if there is any data available to flush */
-    return ZSTDMT_flushNextJob(mtctx, output, 1 /* blockToFlush */);
+    return ZSTDMT_flushProduced(mtctx, output, 1 /* blockToFlush */);
 }
 
 
index 6ec14bb8e156234d11ae3924f94a95e1c2424114..8c66c53bd239f23bba86bd92d9c0bcfc560be3bf 100644 (file)
@@ -99,8 +99,8 @@ unsigned int FUZ_rand(unsigned int* seedPtr)
     if (cond) {                                              \
         DISPLAY("Error => ");                                \
         DISPLAY(__VA_ARGS__);                                \
-        DISPLAY(" (seed %u, test nb %u, line %u)  \n",       \
-                seed, testNb, __LINE__);                     \
+        DISPLAY(" (seed %u, test nb %u, line %u (sig %08X) \n", \
+                seed, testNb, __LINE__, coreSeed);           \
         goto _output_error;                                  \
 }   }
 
@@ -219,6 +219,7 @@ static int basicUnitTests(U32 seed, double compressibility)
     size_t cSize;
     int testResult = 0;
     U32 testNb = 1;
+    U32 coreSeed = 0;  /* just to conform with CHECK_Z macro display */
     ZSTD_CStream* zc = ZSTD_createCStream();
     ZSTD_DStream* zd = ZSTD_createDStream();
     ZSTDMT_CCtx* mtctx = ZSTDMT_createCCtx(2);
@@ -958,10 +959,13 @@ static int fuzzerTests(U32 seed, U32 nbTests, unsigned startTest, double compres
         size_t maxTestSize;
 
         /* init */
-        if (nbTests >= testNb) { DISPLAYUPDATE(2, "\r%6u/%6u    ", testNb, nbTests); }
-        else { DISPLAYUPDATE(2, "\r%6u          ", testNb); }
         FUZ_rand(&coreSeed);
         lseed = coreSeed ^ prime32;
+        if (nbTests >= testNb) {
+            DISPLAYUPDATE(2, "\r%6u/%6u (%08X)   ", testNb, nbTests, lseed);
+        } else {
+            DISPLAYUPDATE(2, "\r%6u  (%08X)        ", testNb, lseed);
+        }
 
         /* states full reset (deliberately not synchronized) */
         /* some issues can only happen when reusing states */
@@ -1171,7 +1175,6 @@ static int fuzzerTests_MT(U32 seed, U32 nbTests, unsigned startTest, double comp
     UTIL_time_t const startClock = UTIL_getTime();
     const BYTE* dict=NULL;   /* can keep same dict on 2 consecutive tests */
     size_t dictSize = 0;
-    U32 oldTestLog = 0;
     int const cLevelMax = bigTests ? (U32)ZSTD_maxCLevel()-1 : g_cLevelMax_smallTests;
     U32 const nbThreadsMax = bigTests ? 4 : 2;
 
@@ -1193,6 +1196,7 @@ static int fuzzerTests_MT(U32 seed, U32 nbTests, unsigned startTest, double comp
     RDG_genBuffer(cNoiseBuffer[4], srcBufferSize, 1.00, 0., coreSeed);    /* sparse content */
     memset(copyBuffer, 0x65, copyBufferSize);                             /* make copyBuffer considered initialized */
     ZSTD_initDStream_usingDict(zd, NULL, 0);  /* ensure at least one init */
+    DISPLAYLEVEL(6, "Creating initial context with %u threads \n", nbThreads);
 
     /* catch up testNb */
     for (testNb=1; testNb < startTest; testNb++)
@@ -1205,14 +1209,14 @@ static int fuzzerTests_MT(U32 seed, U32 nbTests, unsigned startTest, double comp
         size_t totalTestSize, totalGenSize, cSize;
         XXH64_state_t xxhState;
         U64 crcOrig;
-        U32 resetAllowed = 1;
         size_t maxTestSize;
 
-        /* init */
-        if (testNb < nbTests) {
-            DISPLAYUPDATE(2, "\r%6u/%6u    ", testNb, nbTests);
-        } else { DISPLAYUPDATE(2, "\r%6u          ", testNb); }
         FUZ_rand(&coreSeed);
+        if (nbTests >= testNb) {
+            DISPLAYUPDATE(2, "\r%6u/%6u (%08X)   ", testNb, nbTests, coreSeed);
+        } else {
+            DISPLAYUPDATE(2, "\r%6u  (%08X)        ", testNb, coreSeed);
+        }
         lseed = coreSeed ^ prime32;
 
         /* states full reset (deliberately not synchronized) */
@@ -1223,7 +1227,6 @@ static int fuzzerTests_MT(U32 seed, U32 nbTests, unsigned startTest, double comp
             ZSTDMT_freeCCtx(zc);
             zc = ZSTDMT_createCCtx(nbThreads);
             CHECK(zc==NULL, "ZSTDMT_createCCtx allocation error")
-            resetAllowed=0;
         }
         if ((FUZ_rand(&lseed) & 0xFF) == 132) {
             ZSTD_freeDStream(zd);
@@ -1248,16 +1251,7 @@ static int fuzzerTests_MT(U32 seed, U32 nbTests, unsigned startTest, double comp
         }
 
         /* compression init */
-        if ((FUZ_rand(&lseed)&1) /* at beginning, to keep same nb of rand */
-            && oldTestLog /* at least one test happened */ && resetAllowed) {
-            maxTestSize = FUZ_randomLength(&lseed, oldTestLog+2);
-            if (maxTestSize >= srcBufferSize) maxTestSize = srcBufferSize-1;
-            {   int const compressionLevel = (FUZ_rand(&lseed) % 5) + 1;
-                DISPLAYLEVEL(5, "Init with compression level = %i \n", compressionLevel);
-                CHECK_Z( ZSTDMT_initCStream(zc, compressionLevel) );
-            }
-        } else {
-            U32 const testLog = FUZ_rand(&lseed) % maxSrcLog;
+        {   U32 const testLog = FUZ_rand(&lseed) % maxSrcLog;
             U32 const dictLog = FUZ_rand(&lseed) % maxSrcLog;
             int const cLevelCandidate = ( FUZ_rand(&lseed)
                             % (ZSTD_maxCLevel() - (MAX(testLog, dictLog) / 2)) )
@@ -1266,24 +1260,29 @@ static int fuzzerTests_MT(U32 seed, U32 nbTests, unsigned startTest, double comp
             int const cLevelMin = MAX(cLevelThreadAdjusted, 1);  /* no negative cLevel yet */
             int const cLevel = MIN(cLevelMin, cLevelMax);
             maxTestSize = FUZ_rLogLength(&lseed, testLog);
-            oldTestLog = testLog;
-            /* random dictionary selection */
-            dictSize  = ((FUZ_rand(&lseed)&63)==1) ? FUZ_rLogLength(&lseed, dictLog) : 0;
-            {   size_t const dictStart = FUZ_rand(&lseed) % (srcBufferSize - dictSize);
-                dict = srcBuffer + dictStart;
-            }
-            {   U64 const pledgedSrcSize = (FUZ_rand(&lseed) & 3) ? ZSTD_CONTENTSIZE_UNKNOWN : maxTestSize;
-                ZSTD_parameters params = ZSTD_getParams(cLevel, pledgedSrcSize, dictSize);
-                DISPLAYLEVEL(5, "Init with windowLog = %u, pledgedSrcSize = %u, dictSize = %u \n",
-                    params.cParams.windowLog, (U32)pledgedSrcSize, (U32)dictSize);
-                params.fParams.checksumFlag = FUZ_rand(&lseed) & 1;
-                params.fParams.noDictIDFlag = FUZ_rand(&lseed) & 1;
-                params.fParams.contentSizeFlag = FUZ_rand(&lseed) & 1;
-                DISPLAYLEVEL(5, "checksumFlag : %u \n", params.fParams.checksumFlag);
-                CHECK_Z( ZSTDMT_setMTCtxParameter(zc, ZSTDMT_p_overlapSectionLog, FUZ_rand(&lseed) % 12) );
-                CHECK_Z( ZSTDMT_setMTCtxParameter(zc, ZSTDMT_p_jobSize, FUZ_rand(&lseed) % (2*maxTestSize+1)) );   /* custome job size */
-                CHECK_Z( ZSTDMT_initCStream_advanced(zc, dict, dictSize, params, pledgedSrcSize) );
-        }   }
+
+            if (FUZ_rand(&lseed)&1) {   /* simple init */
+                int const compressionLevel = (FUZ_rand(&lseed) % 5) + 1;
+                DISPLAYLEVEL(5, "Init with compression level = %i \n", compressionLevel);
+                CHECK_Z( ZSTDMT_initCStream(zc, compressionLevel) );
+            } else {   /* advanced init */
+                /* random dictionary selection */
+                dictSize  = ((FUZ_rand(&lseed)&63)==1) ? FUZ_rLogLength(&lseed, dictLog) : 0;
+                {   size_t const dictStart = FUZ_rand(&lseed) % (srcBufferSize - dictSize);
+                    dict = srcBuffer + dictStart;
+                }
+                {   U64 const pledgedSrcSize = (FUZ_rand(&lseed) & 3) ? ZSTD_CONTENTSIZE_UNKNOWN : maxTestSize;
+                    ZSTD_parameters params = ZSTD_getParams(cLevel, pledgedSrcSize, dictSize);
+                    DISPLAYLEVEL(5, "Init with windowLog = %u, pledgedSrcSize = %u, dictSize = %u \n",
+                        params.cParams.windowLog, (U32)pledgedSrcSize, (U32)dictSize);
+                    params.fParams.checksumFlag = FUZ_rand(&lseed) & 1;
+                    params.fParams.noDictIDFlag = FUZ_rand(&lseed) & 1;
+                    params.fParams.contentSizeFlag = FUZ_rand(&lseed) & 1;
+                    DISPLAYLEVEL(5, "checksumFlag : %u \n", params.fParams.checksumFlag);
+                    CHECK_Z( ZSTDMT_setMTCtxParameter(zc, ZSTDMT_p_overlapSectionLog, FUZ_rand(&lseed) % 12) );
+                    CHECK_Z( ZSTDMT_setMTCtxParameter(zc, ZSTDMT_p_jobSize, FUZ_rand(&lseed) % (2*maxTestSize+1)) );   /* custome job size */
+                    CHECK_Z( ZSTDMT_initCStream_advanced(zc, dict, dictSize, params, pledgedSrcSize) );
+        }   }   }
 
         /* multi-segments compression test */
         XXH64_reset(&xxhState, 0);
@@ -1336,10 +1335,13 @@ static int fuzzerTests_MT(U32 seed, U32 nbTests, unsigned startTest, double comp
             }   }
             crcOrig = XXH64_digest(&xxhState);
             cSize = outBuff.pos;
-            DISPLAYLEVEL(5, "Frame completed : %u bytes \n", (U32)cSize);
+            DISPLAYLEVEL(5, "Frame completed : %u bytes compressed into %u bytes \n",
+                            (U32)totalTestSize, (U32)cSize);
         }
 
         /* multi - fragments decompression test */
+        assert(totalTestSize < dstBufferSize);
+        memset(dstBuffer, 170, totalTestSize);   /* init dest area */
         if (!dictSize /* don't reset if dictionary : could be different */ && (FUZ_rand(&lseed) & 1)) {
             CHECK_Z( ZSTD_resetDStream(zd) );
         } else {
@@ -1354,14 +1356,16 @@ static int fuzzerTests_MT(U32 seed, U32 nbTests, unsigned startTest, double comp
                 size_t const dstBuffSize = MIN(dstBufferSize - totalGenSize, randomDstSize);
                 inBuff.size = inBuff.pos + readCSrcSize;
                 outBuff.size = outBuff.pos + dstBuffSize;
-                DISPLAYLEVEL(6, "ZSTD_decompressStream input %u bytes \n", (U32)readCSrcSize);
+                DISPLAYLEVEL(6, "ZSTD_decompressStream input %u bytes into outBuff %u bytes \n",
+                                (U32)readCSrcSize, (U32)dstBuffSize);
                 decompressionResult = ZSTD_decompressStream(zd, &outBuff, &inBuff);
                 if (ZSTD_isError(decompressionResult)) {
                     DISPLAY("ZSTD_decompressStream error : %s \n", ZSTD_getErrorName(decompressionResult));
                     findDiff(copyBuffer, dstBuffer, totalTestSize);
                 }
                 CHECK (ZSTD_isError(decompressionResult), "decompression error : %s", ZSTD_getErrorName(decompressionResult));
-                DISPLAYLEVEL(6, "total ingested (inBuff.pos) = %u \n", (U32)inBuff.pos);
+                DISPLAYLEVEL(6, "total ingested (inBuff.pos) = %u and produced (outBuff.pos) = %u \n",
+                                (U32)inBuff.pos, (U32)outBuff.pos);
             }
             CHECK (outBuff.pos != totalTestSize, "decompressed data : wrong size (%u != %u)", (U32)outBuff.pos, (U32)totalTestSize);
             CHECK (inBuff.pos != cSize, "compressed data should be fully read (%u != %u)", (U32)inBuff.pos, (U32)cSize);