]> git.ipfire.org Git - thirdparty/zstd.git/commitdiff
added streaming fuzzer tests for MT API
authorYann Collet <cyan@fb.com>
Thu, 19 Jan 2017 20:12:50 +0000 (12:12 -0800)
committerYann Collet <cyan@fb.com>
Thu, 19 Jan 2017 20:15:29 +0000 (12:15 -0800)
Also : fixed corner case, where nb of jobs completed becomes > jobQueueSize
which is possible when many flushes are issued
while there is not enough dst buffer to flush completed ones.

lib/compress/zstdmt_compress.c
programs/bench.c
tests/Makefile
tests/zstreamtest.c

index b060b73f483ab29d6d50f40313a8e129d3414576..775c52aae84c8fe4906141340c712664cd3b0117 100644 (file)
@@ -243,7 +243,7 @@ struct ZSTDMT_CCtx_s {
 ZSTDMT_CCtx *ZSTDMT_createCCtx(unsigned nbThreads)
 {
     ZSTDMT_CCtx* cctx;
-    U32 const minNbJobs = nbThreads + 1;
+    U32 const minNbJobs = nbThreads + 2;
     U32 const nbJobsLog2 = ZSTD_highbit32(minNbJobs) + 1;
     U32 const nbJobs = 1 << nbJobsLog2;
     DEBUGLOG(4, "nbThreads : %u  ; minNbJobs : %u ;  nbJobsLog2 : %u ;  nbJobs : %u  \n",
@@ -436,7 +436,8 @@ size_t ZSTDMT_compressStream(ZSTDMT_CCtx* zcs, ZSTD_outBuffer* output, ZSTD_inBu
         zcs->inBuff.filled += toLoad;
     }
 
-    if (zcs->inBuff.filled == zcs->inBuffSize) {   /* filled enough : let's compress */
+    if ( (zcs->inBuff.filled == zcs->inBuffSize)  /* filled enough : let's compress */
+        && (zcs->nextJobID <= zcs->doneJobID + zcs->jobIDMask) ) {   /* avoid overwriting job round buffer */
         size_t const dstBufferCapacity = ZSTD_compressBound(zcs->targetSectionSize);
         buffer_t const dstBuffer = ZSTDMT_getBuffer(zcs->buffPool, dstBufferCapacity);
         ZSTD_CCtx* const cctx = ZSTDMT_getCCtx(zcs->cctxPool);
@@ -477,8 +478,8 @@ size_t ZSTDMT_compressStream(ZSTDMT_CCtx* zcs, ZSTD_outBuffer* output, ZSTD_inBu
         zcs->inBuff.filled = (U32)(zcs->inBuffSize - zcs->targetSectionSize);
         memcpy(zcs->inBuff.buffer.start, (const char*)zcs->jobs[jobID].srcStart + zcs->targetSectionSize, zcs->inBuff.filled);
 
-        DEBUGLOG(3, "posting job %u   (%u bytes)", zcs->nextJobID, (U32)zcs->jobs[jobID].srcSize);
-        POOL_add(zcs->factory, ZSTDMT_compressChunk, &zcs->jobs[jobID]);
+        DEBUGLOG(3, "posting job %u   (%u bytes)  (note : doneJob = %u=>%u)", zcs->nextJobID, (U32)zcs->jobs[jobID].srcSize, zcs->doneJobID, zcs->doneJobID & zcs->jobIDMask);
+        POOL_add(zcs->factory, ZSTDMT_compressChunk, &zcs->jobs[jobID]);   /* This call is blocking if all workers are busy */
         zcs->nextJobID++;
     }
 
@@ -487,7 +488,7 @@ size_t ZSTDMT_compressStream(ZSTDMT_CCtx* zcs, ZSTD_outBuffer* output, ZSTD_inBu
         ZSTDMT_jobDescription job = zcs->jobs[jobID];
         if (job.jobCompleted) {   /* job completed : output can be flushed */
             size_t const toWrite = MIN(job.cSize - job.dstFlushed, output->size - output->pos);
-            DEBUGLOG(1, "trying to flush compressed data from job %u \n", (U32)zcs->doneJobID);
+            DEBUGLOG(1, "flush %u bytes from job %u ", (U32)toWrite, zcs->doneJobID);
             ZSTDMT_releaseCCtx(zcs->cctxPool, job.cctx);
             zcs->jobs[jobID].cctx = NULL;
             ZSTDMT_releaseBuffer(zcs->buffPool, job.src);
@@ -500,6 +501,7 @@ size_t ZSTDMT_compressStream(ZSTDMT_CCtx* zcs, ZSTD_outBuffer* output, ZSTD_inBu
             memcpy((char*)output->dst + output->pos, (const char*)job.dstBuff.start + job.dstFlushed, toWrite);
             output->pos += toWrite;
             job.dstFlushed += toWrite;
+            DEBUGLOG(1, "remaining : %u bytes ", (U32)(job.cSize - job.dstFlushed));
             if (job.dstFlushed == job.cSize) {   /* output buffer fully flushed => go to next one */
                 ZSTDMT_releaseBuffer(zcs->buffPool, job.dstBuff);
                 zcs->jobs[jobID].dstBuff = g_nullBuffer;
@@ -519,7 +521,8 @@ static size_t ZSTDMT_flushStream_internal(ZSTDMT_CCtx* zcs, ZSTD_outBuffer* outp
     size_t const srcSize = zcs->inBuff.filled;
 
     DEBUGLOG(1, "flushing : %u bytes to compress", (U32)srcSize);
-    if ((srcSize > 0) || (endFrame && !zcs->frameEnded)) {
+    if ( ((srcSize > 0) || (endFrame && !zcs->frameEnded))
+       && (zcs->nextJobID <= zcs->doneJobID + zcs->jobIDMask) ) {
         size_t const dstBufferCapacity = ZSTD_compressBound(srcSize);
         buffer_t const dstBuffer = ZSTDMT_getBuffer(zcs->buffPool, dstBufferCapacity);
         ZSTD_CCtx* const cctx = ZSTDMT_getCCtx(zcs->cctxPool);
@@ -564,7 +567,7 @@ static size_t ZSTDMT_flushStream_internal(ZSTDMT_CCtx* zcs, ZSTD_outBuffer* outp
             zcs->frameEnded = 1;
         }
 
-        DEBUGLOG(1, "posting job %u : %u bytes  (end:%u)", zcs->nextJobID, (U32)zcs->jobs[jobID].srcSize, zcs->jobs[jobID].lastChunk);
+        DEBUGLOG(1, "posting job %u : %u bytes  (end:%u) (note : doneJob = %u=>%u)", zcs->nextJobID, (U32)zcs->jobs[jobID].srcSize, zcs->jobs[jobID].lastChunk, zcs->doneJobID, zcs->doneJobID & zcs->jobIDMask);
         POOL_add(zcs->factory, ZSTDMT_compressChunk, &zcs->jobs[jobID]);   /* this call is blocking when thread worker pool is exhausted */
         zcs->nextJobID++;
     }
@@ -575,7 +578,7 @@ static size_t ZSTDMT_flushStream_internal(ZSTDMT_CCtx* zcs, ZSTD_outBuffer* outp
     {   unsigned const wJobID = zcs->doneJobID & zcs->jobIDMask;
         PTHREAD_MUTEX_LOCK(&zcs->jobCompleted_mutex);
         while (zcs->jobs[wJobID].jobCompleted==0) {
-            DEBUGLOG(4, "waiting for jobCompleted signal from chunk %u", zcs->doneJobID);   /* we want to block when waiting for data to flush */
+            DEBUGLOG(5, "waiting for jobCompleted signal from job %u", zcs->doneJobID);   /* we want to block when waiting for data to flush */
             pthread_cond_wait(&zcs->jobCompleted_cond, &zcs->jobCompleted_mutex);
         }
         pthread_mutex_unlock(&zcs->jobCompleted_mutex);
@@ -602,7 +605,7 @@ static size_t ZSTDMT_flushStream_internal(ZSTDMT_CCtx* zcs, ZSTD_outBuffer* outp
             }
             /* return value : how many bytes left in buffer ; fake it to 1 if unknown but >0 */
             if (job.cSize > job.dstFlushed) return (job.cSize - job.dstFlushed);
-            if (zcs->doneJobID < zcs->nextJobID) return 1;   /* still some buffer to flush */
+            if ((zcs->doneJobID < zcs->nextJobID) || (zcs->inBuff.filled)) return 1;   /* still some buffer to flush */
             zcs->allJobsCompleted = zcs->frameEnded;
             return 0;
     }   }
index 40e1d4abaa8e4556d96f894f488182f797207f5a..5299b47120120b9910ec6b3fb33f569fc30510ba 100644 (file)
@@ -385,6 +385,17 @@ static int BMK_benchMem(const void* srcBuffer, size_t srcSize,
                             pos = (U32)(u - bacc);
                             bNb = pos / (128 KB);
                             DISPLAY("(block %u, sub %u, pos %u) \n", segNb, bNb, pos);
+                            if (u>5) {
+                                int n;
+                                for (n=-5; n<0; n++) DISPLAY("%02X ", ((const BYTE*)srcBuffer)[u+n]);
+                                DISPLAY(" :%02X:  ", ((const BYTE*)srcBuffer)[u]);
+                                for (n=1; n<3; n++) DISPLAY("%02X ", ((const BYTE*)srcBuffer)[u+n]);
+                                DISPLAY(" \n");
+                                for (n=-5; n<0; n++) DISPLAY("%02X ", ((const BYTE*)resultBuffer)[u+n]);
+                                DISPLAY(" :%02X:  ", ((const BYTE*)resultBuffer)[u]);
+                                for (n=1; n<3; n++) DISPLAY("%02X ", ((const BYTE*)resultBuffer)[u+n]);
+                                DISPLAY(" \n");
+                            }
                             break;
                         }
                         if (u==srcSize-1) {  /* should never happen */
index 6312584a931a8cf62d1ccdb06bede79619b389d0..2f399242e90cdea60070e12e4407c9413027a4fe 100644 (file)
 # zstreamtest32: Same as zstreamtest, but forced to compile in 32-bits mode
 # ##########################################################################
 
-DESTDIR?=
-PREFIX ?= /usr/local
-BINDIR  = $(PREFIX)/bin
-MANDIR  = $(PREFIX)/share/man/man1
 ZSTDDIR = ../lib
 PRGDIR  = ../programs
 PYTHON ?= python3
 TESTARTEFACT := versionsTest namespaceTest
 
 
-CPPFLAGS+= -I$(ZSTDDIR) -I$(ZSTDDIR)/common -I$(ZSTDDIR)/dictBuilder -I$(ZSTDDIR)/deprecated -I$(PRGDIR)
+CPPFLAGS+= -I$(ZSTDDIR) -I$(ZSTDDIR)/common -I$(ZSTDDIR)/compress -I$(ZSTDDIR)/dictBuilder -I$(ZSTDDIR)/deprecated -I$(PRGDIR)
 CFLAGS  ?= -O3
 CFLAGS  += -Wall -Wextra -Wcast-qual -Wcast-align -Wshadow -Wstrict-aliasing=1 \
           -Wswitch-enum -Wdeclaration-after-statement -Wstrict-prototypes -Wundef
index ce619308572cc40af5410b430fa5978691f2ab58..8720ec78a76279dcf3fa9f8e3223b3d9023e1af4 100644 (file)
@@ -29,6 +29,7 @@
 #define ZSTD_STATIC_LINKING_ONLY   /* ZSTD_maxCLevel, ZSTD_customMem */
 #include "zstd.h"         /* ZSTD_compressBound */
 #include "zstd_errors.h"  /* ZSTD_error_srcSize_wrong */
+#include "zstdmt_compress.h"
 #include "datagen.h"      /* RDG_genBuffer */
 #define XXH_STATIC_LINKING_ONLY   /* XXH64_state_t */
 #include "xxhash.h"       /* XXH64_* */
@@ -137,7 +138,7 @@ static int basicUnitTests(U32 seed, double compressibility, ZSTD_customMem custo
     cSize = skippableFrameSize + 8;
 
     /* Basic compression test */
-    DISPLAYLEVEL(4, "test%3i : compress %u bytes : ", testNb++, COMPRESSIBLE_NOISE_LENGTH);
+    DISPLAYLEVEL(3, "test%3i : compress %u bytes : ", testNb++, COMPRESSIBLE_NOISE_LENGTH);
     ZSTD_initCStream_usingDict(zc, CNBuffer, 128 KB, 1);
     outBuff.dst = (char*)(compressedBuffer)+cSize;
     outBuff.size = compressedBufferSize;
@@ -151,16 +152,16 @@ static int basicUnitTests(U32 seed, double compressibility, ZSTD_customMem custo
     { size_t const r = ZSTD_endStream(zc, &outBuff);
       if (r != 0) goto _output_error; }  /* error, or some data not flushed */
     cSize += outBuff.pos;
-    DISPLAYLEVEL(4, "OK (%u bytes : %.2f%%)\n", (U32)cSize, (double)cSize/COMPRESSIBLE_NOISE_LENGTH*100);
+    DISPLAYLEVEL(3, "OK (%u bytes : %.2f%%)\n", (U32)cSize, (double)cSize/COMPRESSIBLE_NOISE_LENGTH*100);
 
-    DISPLAYLEVEL(4, "test%3i : check CStream size : ", testNb++);
+    DISPLAYLEVEL(3, "test%3i : check CStream size : ", testNb++);
     { size_t const s = ZSTD_sizeof_CStream(zc);
       if (ZSTD_isError(s)) goto _output_error;
-      DISPLAYLEVEL(4, "OK (%u bytes) \n", (U32)s);
+      DISPLAYLEVEL(3, "OK (%u bytes) \n", (U32)s);
     }
 
     /* skippable frame test */
-    DISPLAYLEVEL(4, "test%3i : decompress skippable frame : ", testNb++);
+    DISPLAYLEVEL(3, "test%3i : decompress skippable frame : ", testNb++);
     ZSTD_initDStream_usingDict(zd, CNBuffer, 128 KB);
     inBuff.src = compressedBuffer;
     inBuff.size = cSize;
@@ -171,11 +172,11 @@ static int basicUnitTests(U32 seed, double compressibility, ZSTD_customMem custo
     { size_t const r = ZSTD_decompressStream(zd, &outBuff, &inBuff);
       if (r != 0) goto _output_error; }
     if (outBuff.pos != 0) goto _output_error;   /* skippable frame len is 0 */
-    DISPLAYLEVEL(4, "OK \n");
+    DISPLAYLEVEL(3, "OK \n");
 
     /* Basic decompression test */
     inBuff2 = inBuff;
-    DISPLAYLEVEL(4, "test%3i : decompress %u bytes : ", testNb++, COMPRESSIBLE_NOISE_LENGTH);
+    DISPLAYLEVEL(3, "test%3i : decompress %u bytes : ", testNb++, COMPRESSIBLE_NOISE_LENGTH);
     ZSTD_initDStream_usingDict(zd, CNBuffer, 128 KB);
     { size_t const r = ZSTD_setDStreamParameter(zd, ZSTDdsp_maxWindowSize, 1000000000);  /* large limit */
       if (ZSTD_isError(r)) goto _output_error; }
@@ -183,33 +184,33 @@ static int basicUnitTests(U32 seed, double compressibility, ZSTD_customMem custo
       if (remaining != 0) goto _output_error; }  /* should reach end of frame == 0; otherwise, some data left, or an error */
     if (outBuff.pos != CNBufferSize) goto _output_error;   /* should regenerate the same amount */
     if (inBuff.pos != inBuff.size) goto _output_error;   /* should have read the entire frame */
-    DISPLAYLEVEL(4, "OK \n");
+    DISPLAYLEVEL(3, "OK \n");
 
     /* Re-use without init */
-    DISPLAYLEVEL(4, "test%3i : decompress again without init (re-use previous settings): ", testNb++);
+    DISPLAYLEVEL(3, "test%3i : decompress again without init (re-use previous settings): ", testNb++);
     outBuff.pos = 0;
     { size_t const remaining = ZSTD_decompressStream(zd, &outBuff, &inBuff2);
       if (remaining != 0) goto _output_error; }  /* should reach end of frame == 0; otherwise, some data left, or an error */
     if (outBuff.pos != CNBufferSize) goto _output_error;   /* should regenerate the same amount */
     if (inBuff.pos != inBuff.size) goto _output_error;   /* should have read the entire frame */
-    DISPLAYLEVEL(4, "OK \n");
+    DISPLAYLEVEL(3, "OK \n");
 
     /* check regenerated data is byte exact */
-    DISPLAYLEVEL(4, "test%3i : check decompressed result : ", testNb++);
+    DISPLAYLEVEL(3, "test%3i : check decompressed result : ", testNb++);
     {   size_t i;
         for (i=0; i<CNBufferSize; i++) {
             if (((BYTE*)decodedBuffer)[i] != ((BYTE*)CNBuffer)[i]) goto _output_error;
     }   }
-    DISPLAYLEVEL(4, "OK \n");
+    DISPLAYLEVEL(3, "OK \n");
 
-    DISPLAYLEVEL(4, "test%3i : check DStream size : ", testNb++);
+    DISPLAYLEVEL(3, "test%3i : check DStream size : ", testNb++);
     { size_t const s = ZSTD_sizeof_DStream(zd);
       if (ZSTD_isError(s)) goto _output_error;
-      DISPLAYLEVEL(4, "OK (%u bytes) \n", (U32)s);
+      DISPLAYLEVEL(3, "OK (%u bytes) \n", (U32)s);
     }
 
     /* Byte-by-byte decompression test */
-    DISPLAYLEVEL(4, "test%3i : decompress byte-by-byte : ", testNb++);
+    DISPLAYLEVEL(3, "test%3i : decompress byte-by-byte : ", testNb++);
     {   /* skippable frame */
         size_t r = 1;
         ZSTD_initDStream_usingDict(zd, CNBuffer, 128 KB);
@@ -235,18 +236,18 @@ static int basicUnitTests(U32 seed, double compressibility, ZSTD_customMem custo
     }
     if (outBuff.pos != CNBufferSize) goto _output_error;   /* should regenerate the same amount */
     if (inBuff.pos != cSize) goto _output_error;   /* should have read the entire frame */
-    DISPLAYLEVEL(4, "OK \n");
+    DISPLAYLEVEL(3, "OK \n");
 
     /* check regenerated data is byte exact */
-    DISPLAYLEVEL(4, "test%3i : check decompressed result : ", testNb++);
+    DISPLAYLEVEL(3, "test%3i : check decompressed result : ", testNb++);
     {   size_t i;
         for (i=0; i<CNBufferSize; i++) {
             if (((BYTE*)decodedBuffer)[i] != ((BYTE*)CNBuffer)[i]) goto _output_error;;
     }   }
-    DISPLAYLEVEL(4, "OK \n");
+    DISPLAYLEVEL(3, "OK \n");
 
     /* _srcSize compression test */
-    DISPLAYLEVEL(4, "test%3i : compress_srcSize %u bytes : ", testNb++, COMPRESSIBLE_NOISE_LENGTH);
+    DISPLAYLEVEL(3, "test%3i : compress_srcSize %u bytes : ", testNb++, COMPRESSIBLE_NOISE_LENGTH);
     ZSTD_initCStream_srcSize(zc, 1, CNBufferSize);
     outBuff.dst = (char*)(compressedBuffer);
     outBuff.size = compressedBufferSize;
@@ -260,13 +261,11 @@ static int basicUnitTests(U32 seed, double compressibility, ZSTD_customMem custo
     { size_t const r = ZSTD_endStream(zc, &outBuff);
       if (r != 0) goto _output_error; }  /* error, or some data not flushed */
     { unsigned long long origSize = ZSTD_getDecompressedSize(outBuff.dst, outBuff.pos);
-      DISPLAY("outBuff.pos : %u \n", (U32)outBuff.pos);
-      DISPLAY("origSize = %u \n", (U32)origSize);
       if ((size_t)origSize != CNBufferSize) goto _output_error; }  /* exact original size must be present */
-    DISPLAYLEVEL(4, "OK (%u bytes : %.2f%%)\n", (U32)cSize, (double)cSize/COMPRESSIBLE_NOISE_LENGTH*100);
+    DISPLAYLEVEL(3, "OK (%u bytes : %.2f%%)\n", (U32)cSize, (double)cSize/COMPRESSIBLE_NOISE_LENGTH*100);
 
     /* wrong _srcSize compression test */
-    DISPLAYLEVEL(4, "test%3i : wrong srcSize : %u bytes : ", testNb++, COMPRESSIBLE_NOISE_LENGTH-1);
+    DISPLAYLEVEL(3, "test%3i : wrong srcSize : %u bytes : ", testNb++, COMPRESSIBLE_NOISE_LENGTH-1);
     ZSTD_initCStream_srcSize(zc, 1, CNBufferSize-1);
     outBuff.dst = (char*)(compressedBuffer);
     outBuff.size = compressedBufferSize;
@@ -279,10 +278,10 @@ static int basicUnitTests(U32 seed, double compressibility, ZSTD_customMem custo
     if (inBuff.pos != inBuff.size) goto _output_error;   /* entire input should be consumed */
     { size_t const r = ZSTD_endStream(zc, &outBuff);
       if (ZSTD_getErrorCode(r) != ZSTD_error_srcSize_wrong) goto _output_error;    /* must fail : wrong srcSize */
-      DISPLAYLEVEL(4, "OK (error detected : %s) \n", ZSTD_getErrorName(r)); }
+      DISPLAYLEVEL(3, "OK (error detected : %s) \n", ZSTD_getErrorName(r)); }
 
     /* Complex context re-use scenario */
-    DISPLAYLEVEL(4, "test%3i : context re-use : ", testNb++);
+    DISPLAYLEVEL(3, "test%3i : context re-use : ", testNb++);
     ZSTD_freeCStream(zc);
     zc = ZSTD_createCStream_advanced(customMem);
     if (zc==NULL) goto _output_error;   /* memory allocation issue */
@@ -316,10 +315,10 @@ static int basicUnitTests(U32 seed, double compressibility, ZSTD_customMem custo
         { size_t const r = ZSTD_endStream(zc, &outBuff);
             if (r != 0) goto _output_error; }  /* error, or some data not flushed */
     }
-    DISPLAYLEVEL(4, "OK \n");
+    DISPLAYLEVEL(3, "OK \n");
 
     /* CDict scenario */
-    DISPLAYLEVEL(4, "test%3i : digested dictionary : ", testNb++);
+    DISPLAYLEVEL(3, "test%3i : digested dictionary : ", testNb++);
     {   ZSTD_CDict* const cdict = ZSTD_createCDict(CNBuffer, 128 KB, 1);
         size_t const initError = ZSTD_initCStream_usingCDict(zc, cdict);
         if (ZSTD_isError(initError)) goto _output_error;
@@ -337,17 +336,17 @@ static int basicUnitTests(U32 seed, double compressibility, ZSTD_customMem custo
           if (r != 0) goto _output_error; }  /* error, or some data not flushed */
         cSize = outBuff.pos;
         ZSTD_freeCDict(cdict);
-        DISPLAYLEVEL(4, "OK (%u bytes : %.2f%%)\n", (U32)cSize, (double)cSize/CNBufferSize*100);
+        DISPLAYLEVEL(3, "OK (%u bytes : %.2f%%)\n", (U32)cSize, (double)cSize/CNBufferSize*100);
     }
 
-    DISPLAYLEVEL(4, "test%3i : check CStream size : ", testNb++);
+    DISPLAYLEVEL(3, "test%3i : check CStream size : ", testNb++);
     { size_t const s = ZSTD_sizeof_CStream(zc);
       if (ZSTD_isError(s)) goto _output_error;
-      DISPLAYLEVEL(4, "OK (%u bytes) \n", (U32)s);
+      DISPLAYLEVEL(3, "OK (%u bytes) \n", (U32)s);
     }
 
     /* DDict scenario */
-    DISPLAYLEVEL(4, "test%3i : decompress %u bytes with digested dictionary : ", testNb++, (U32)CNBufferSize);
+    DISPLAYLEVEL(3, "test%3i : decompress %u bytes with digested dictionary : ", testNb++, (U32)CNBufferSize);
     {   ZSTD_DDict* const ddict = ZSTD_createDDict(CNBuffer, 128 KB);
         size_t const initError = ZSTD_initDStream_usingDDict(zd, ddict);
         if (ZSTD_isError(initError)) goto _output_error;
@@ -362,17 +361,17 @@ static int basicUnitTests(U32 seed, double compressibility, ZSTD_customMem custo
         if (outBuff.pos != CNBufferSize) goto _output_error;   /* should regenerate the same amount */
         if (inBuff.pos != inBuff.size) goto _output_error;   /* should have read the entire frame */
         ZSTD_freeDDict(ddict);
-        DISPLAYLEVEL(4, "OK \n");
+        DISPLAYLEVEL(3, "OK \n");
     }
 
     /* test ZSTD_setDStreamParameter() resilience */
-    DISPLAYLEVEL(4, "test%3i : wrong parameter for ZSTD_setDStreamParameter(): ", testNb++);
+    DISPLAYLEVEL(3, "test%3i : wrong parameter for ZSTD_setDStreamParameter(): ", testNb++);
     { size_t const r = ZSTD_setDStreamParameter(zd, (ZSTD_DStreamParameter_e)999, 1);  /* large limit */
       if (!ZSTD_isError(r)) goto _output_error; }
-    DISPLAYLEVEL(4, "OK \n");
+    DISPLAYLEVEL(3, "OK \n");
 
     /* Memory restriction */
-    DISPLAYLEVEL(4, "test%3i : maxWindowSize < frame requirement : ", testNb++);
+    DISPLAYLEVEL(3, "test%3i : maxWindowSize < frame requirement : ", testNb++);
     ZSTD_initDStream_usingDict(zd, CNBuffer, 128 KB);
     { size_t const r = ZSTD_setDStreamParameter(zd, ZSTDdsp_maxWindowSize, 1000);  /* too small limit */
       if (ZSTD_isError(r)) goto _output_error; }
@@ -384,7 +383,7 @@ static int basicUnitTests(U32 seed, double compressibility, ZSTD_customMem custo
     outBuff.pos = 0;
     { size_t const r = ZSTD_decompressStream(zd, &outBuff, &inBuff);
       if (!ZSTD_isError(r)) goto _output_error;  /* must fail : frame requires > 100 bytes */
-      DISPLAYLEVEL(4, "OK (%s)\n", ZSTD_getErrorName(r)); }
+      DISPLAYLEVEL(3, "OK (%s)\n", ZSTD_getErrorName(r)); }
 
 
 _end:
@@ -412,6 +411,7 @@ static size_t findDiff(const void* buf1, const void* buf2, size_t max)
     for (u=0; u<max; u++) {
         if (b1[u] != b2[u]) break;
     }
+    DISPLAY("Error at position %u / %u \n", (U32)u, (U32)max);
     return u;
 }
 
@@ -660,6 +660,245 @@ _output_error:
 }
 
 
+/* Multi-threading version of fuzzer Tests */
+static int fuzzerTests_MT(U32 seed, U32 nbTests, unsigned startTest, double compressibility)
+{
+    static const U32 maxSrcLog = 24;
+    static const U32 maxSampleLog = 19;
+    size_t const srcBufferSize = (size_t)1<<maxSrcLog;
+    BYTE* cNoiseBuffer[5];
+    size_t const copyBufferSize= srcBufferSize + (1<<maxSampleLog);
+    BYTE*  const copyBuffer = (BYTE*)malloc (copyBufferSize);
+    size_t const cBufferSize   = ZSTD_compressBound(srcBufferSize);
+    BYTE*  const cBuffer = (BYTE*)malloc (cBufferSize);
+    size_t const dstBufferSize = srcBufferSize;
+    BYTE*  const dstBuffer = (BYTE*)malloc (dstBufferSize);
+    U32 result = 0;
+    U32 testNb = 0;
+    U32 coreSeed = seed;
+    ZSTDMT_CCtx* zc = ZSTDMT_createCCtx(2);   /* will be reset sometimes */
+    ZSTD_DStream* zd = ZSTD_createDStream();   /* will be reset sometimes */
+    ZSTD_DStream* const zd_noise = ZSTD_createDStream();
+    clock_t const startClock = clock();
+    const BYTE* dict=NULL;   /* can keep same dict on 2 consecutive tests */
+    size_t dictSize = 0;
+    U32 oldTestLog = 0;
+
+    /* allocations */
+    cNoiseBuffer[0] = (BYTE*)malloc (srcBufferSize);
+    cNoiseBuffer[1] = (BYTE*)malloc (srcBufferSize);
+    cNoiseBuffer[2] = (BYTE*)malloc (srcBufferSize);
+    cNoiseBuffer[3] = (BYTE*)malloc (srcBufferSize);
+    cNoiseBuffer[4] = (BYTE*)malloc (srcBufferSize);
+    CHECK (!cNoiseBuffer[0] || !cNoiseBuffer[1] || !cNoiseBuffer[2] || !cNoiseBuffer[3] || !cNoiseBuffer[4] ||
+           !copyBuffer || !dstBuffer || !cBuffer || !zc || !zd || !zd_noise ,
+           "Not enough memory, fuzzer tests cancelled");
+
+    /* Create initial samples */
+    RDG_genBuffer(cNoiseBuffer[0], srcBufferSize, 0.00, 0., coreSeed);    /* pure noise */
+    RDG_genBuffer(cNoiseBuffer[1], srcBufferSize, 0.05, 0., coreSeed);    /* barely compressible */
+    RDG_genBuffer(cNoiseBuffer[2], srcBufferSize, compressibility, 0., coreSeed);
+    RDG_genBuffer(cNoiseBuffer[3], srcBufferSize, 0.95, 0., coreSeed);    /* highly compressible */
+    RDG_genBuffer(cNoiseBuffer[4], srcBufferSize, 1.00, 0., coreSeed);    /* sparse content */
+    memset(copyBuffer, 0x65, copyBufferSize);                             /* make copyBuffer considered initialized */
+    ZSTD_initDStream_usingDict(zd, NULL, 0);  /* ensure at least one init */
+
+    /* catch up testNb */
+    for (testNb=1; testNb < startTest; testNb++)
+        FUZ_rand(&coreSeed);
+
+    /* test loop */
+    for ( ; (testNb <= nbTests) || (FUZ_GetClockSpan(startClock) < g_clockTime) ; testNb++ ) {
+        U32 lseed;
+        const BYTE* srcBuffer;
+        size_t totalTestSize, totalGenSize, cSize;
+        XXH64_state_t xxhState;
+        U64 crcOrig;
+        U32 resetAllowed = 1;
+        size_t maxTestSize;
+
+        /* init */
+        if (nbTests >= testNb) { DISPLAYUPDATE(2, "\r%6u/%6u    ", testNb, nbTests); }
+        else { DISPLAYUPDATE(2, "\r%6u          ", testNb); }
+        FUZ_rand(&coreSeed);
+        lseed = coreSeed ^ prime1;
+
+        /* states full reset (deliberately not synchronized) */
+        /* some issues can only happen when reusing states */
+        if ((FUZ_rand(&lseed) & 0xFF) == 131) {
+            U32 const nbThreads = (FUZ_rand(&lseed) % 6) + 1;
+            ZSTDMT_freeCCtx(zc);
+            zc = ZSTDMT_createCCtx(nbThreads);
+            resetAllowed=0;
+        }
+        if ((FUZ_rand(&lseed) & 0xFF) == 132) {
+            ZSTD_freeDStream(zd);
+            zd = ZSTD_createDStream();
+            ZSTD_initDStream_usingDict(zd, NULL, 0);  /* ensure at least one init */
+        }
+
+        /* srcBuffer selection [0-4] */
+        {   U32 buffNb = FUZ_rand(&lseed) & 0x7F;
+            if (buffNb & 7) buffNb=2;   /* most common : compressible (P) */
+            else {
+                buffNb >>= 3;
+                if (buffNb & 7) {
+                    const U32 tnb[2] = { 1, 3 };   /* barely/highly compressible */
+                    buffNb = tnb[buffNb >> 3];
+                } else {
+                    const U32 tnb[2] = { 0, 4 };   /* not compressible / sparse */
+                    buffNb = tnb[buffNb >> 3];
+            }   }
+            srcBuffer = cNoiseBuffer[buffNb];
+        }
+
+        /* compression init */
+        if ((FUZ_rand(&lseed)&1) /* at beginning, to keep same nb of rand */
+            && oldTestLog /* at least one test happened */ && resetAllowed) {
+            maxTestSize = FUZ_randomLength(&lseed, oldTestLog+2);
+            if (maxTestSize >= srcBufferSize) maxTestSize = srcBufferSize-1;
+            {   int const compressionLevel = (FUZ_rand(&lseed) % 5) + 1;
+                size_t const resetError = ZSTDMT_initCStream(zc, compressionLevel);
+                CHECK(ZSTD_isError(resetError), "ZSTD_resetCStream error : %s", ZSTD_getErrorName(resetError));
+            }
+        } else {
+            U32 const testLog = FUZ_rand(&lseed) % maxSrcLog;
+            U32 const cLevel = (FUZ_rand(&lseed) % (ZSTD_maxCLevel() - (testLog/3))) + 1;
+            maxTestSize = FUZ_rLogLength(&lseed, testLog);
+            oldTestLog = testLog;
+            /* random dictionary selection */
+            dictSize  = 0;
+            dict = NULL;
+            {   U64 const pledgedSrcSize = (FUZ_rand(&lseed) & 3) ? 0 : maxTestSize;
+                ZSTD_parameters params = ZSTD_getParams(cLevel, pledgedSrcSize, dictSize);
+                params.fParams.checksumFlag = FUZ_rand(&lseed) & 1;
+                params.fParams.noDictIDFlag = FUZ_rand(&lseed) & 1;
+                {   size_t const initError = ZSTDMT_initCStream(zc, cLevel);
+                    CHECK (ZSTD_isError(initError),"ZSTD_initCStream_advanced error : %s", ZSTD_getErrorName(initError));
+        }   }   }
+
+        /* multi-segments compression test */
+        XXH64_reset(&xxhState, 0);
+        {   ZSTD_outBuffer outBuff = { cBuffer, cBufferSize, 0 } ;
+            U32 n;
+            for (n=0, cSize=0, totalTestSize=0 ; totalTestSize < maxTestSize ; n++) {
+                /* compress random chunks into randomly sized dst buffers */
+                {   size_t const randomSrcSize = FUZ_randomLength(&lseed, maxSampleLog);
+                    size_t const srcSize = MIN (maxTestSize-totalTestSize, randomSrcSize);
+                    size_t const srcStart = FUZ_rand(&lseed) % (srcBufferSize - srcSize);
+                    size_t const randomDstSize = FUZ_randomLength(&lseed, maxSampleLog);
+                    size_t const dstBuffSize = MIN(cBufferSize - cSize, randomDstSize);
+                    ZSTD_inBuffer inBuff = { srcBuffer+srcStart, srcSize, 0 };
+                    outBuff.size = outBuff.pos + dstBuffSize;
+
+                    DISPLAYLEVEL(5, "Sending %u bytes to compress \n", (U32)srcSize);
+                    { size_t const compressionError = ZSTDMT_compressStream(zc, &outBuff, &inBuff);
+                      CHECK (ZSTD_isError(compressionError), "compression error : %s", ZSTD_getErrorName(compressionError)); }
+
+                    XXH64_update(&xxhState, srcBuffer+srcStart, inBuff.pos);
+                    memcpy(copyBuffer+totalTestSize, srcBuffer+srcStart, inBuff.pos);
+                    totalTestSize += inBuff.pos;
+                }
+
+                /* random flush operation, to mess around */
+                if ((FUZ_rand(&lseed) & 15) == 0) {
+                    size_t const randomDstSize = FUZ_randomLength(&lseed, maxSampleLog);
+                    size_t const adjustedDstSize = MIN(cBufferSize - cSize, randomDstSize);
+                    outBuff.size = outBuff.pos + adjustedDstSize;
+                    DISPLAYLEVEL(5, "Flushing into dst buffer of size %u \n", (U32)adjustedDstSize);
+                    {   size_t const flushError = ZSTDMT_flushStream(zc, &outBuff);
+                        CHECK (ZSTD_isError(flushError), "flush error : %s", ZSTD_getErrorName(flushError));
+            }   }   }
+
+            /* final frame epilogue */
+            {   size_t remainingToFlush = (size_t)(-1);
+                while (remainingToFlush) {
+                    size_t const randomDstSize = FUZ_randomLength(&lseed, maxSampleLog);
+                    size_t const adjustedDstSize = MIN(cBufferSize - cSize, randomDstSize);
+                    outBuff.size = outBuff.pos + adjustedDstSize;
+                    DISPLAYLEVEL(5, "Ending into dst buffer of size %u \n", (U32)adjustedDstSize);
+                    remainingToFlush = ZSTDMT_endStream(zc, &outBuff);
+                    CHECK (ZSTD_isError(remainingToFlush), "flush error : %s", ZSTD_getErrorName(remainingToFlush));
+                    DISPLAYLEVEL(5, "endStream : remainingToFlush : %u \n", (U32)remainingToFlush);
+            }   }
+            DISPLAYLEVEL(5, "Frame completed \n");
+            crcOrig = XXH64_digest(&xxhState);
+            cSize = outBuff.pos;
+        }
+
+        /* multi - fragments decompression test */
+        if (!dictSize /* don't reset if dictionary : could be different */ && (FUZ_rand(&lseed) & 1)) {
+            CHECK (ZSTD_isError(ZSTD_resetDStream(zd)), "ZSTD_resetDStream failed");
+        } else {
+            ZSTD_initDStream_usingDict(zd, dict, dictSize);
+        }
+        {   size_t decompressionResult = 1;
+            ZSTD_inBuffer  inBuff = { cBuffer, cSize, 0 };
+            ZSTD_outBuffer outBuff= { dstBuffer, dstBufferSize, 0 };
+            for (totalGenSize = 0 ; decompressionResult ; ) {
+                size_t const readCSrcSize = FUZ_randomLength(&lseed, maxSampleLog);
+                size_t const randomDstSize = FUZ_randomLength(&lseed, maxSampleLog);
+                size_t const dstBuffSize = MIN(dstBufferSize - totalGenSize, randomDstSize);
+                inBuff.size = inBuff.pos + readCSrcSize;
+                outBuff.size = inBuff.pos + dstBuffSize;
+                decompressionResult = ZSTD_decompressStream(zd, &outBuff, &inBuff);
+                CHECK (ZSTD_isError(decompressionResult), "decompression error : %s", ZSTD_getErrorName(decompressionResult));
+            }
+            CHECK (decompressionResult != 0, "frame not fully decoded");
+            CHECK (outBuff.pos != totalTestSize, "decompressed data : wrong size")
+            CHECK (inBuff.pos != cSize, "compressed data should be fully read")
+            {   U64 const crcDest = XXH64(dstBuffer, totalTestSize, 0);
+                if (crcDest!=crcOrig) findDiff(copyBuffer, dstBuffer, totalTestSize);
+                CHECK (crcDest!=crcOrig, "decompressed data corrupted");
+        }   }
+
+        /*=====   noisy/erroneous src decompression test   =====*/
+
+        /* add some noise */
+        {   U32 const nbNoiseChunks = (FUZ_rand(&lseed) & 7) + 2;
+            U32 nn; for (nn=0; nn<nbNoiseChunks; nn++) {
+                size_t const randomNoiseSize = FUZ_randomLength(&lseed, maxSampleLog);
+                size_t const noiseSize  = MIN((cSize/3) , randomNoiseSize);
+                size_t const noiseStart = FUZ_rand(&lseed) % (srcBufferSize - noiseSize);
+                size_t const cStart = FUZ_rand(&lseed) % (cSize - noiseSize);
+                memcpy(cBuffer+cStart, srcBuffer+noiseStart, noiseSize);
+        }   }
+
+        /* try decompression on noisy data */
+        ZSTD_initDStream(zd_noise);   /* note : no dictionary */
+        {   ZSTD_inBuffer  inBuff = { cBuffer, cSize, 0 };
+            ZSTD_outBuffer outBuff= { dstBuffer, dstBufferSize, 0 };
+            while (outBuff.pos < dstBufferSize) {
+                size_t const randomCSrcSize = FUZ_randomLength(&lseed, maxSampleLog);
+                size_t const randomDstSize = FUZ_randomLength(&lseed, maxSampleLog);
+                size_t const adjustedDstSize = MIN(dstBufferSize - outBuff.pos, randomDstSize);
+                outBuff.size = outBuff.pos + adjustedDstSize;
+                inBuff.size  = inBuff.pos + randomCSrcSize;
+                {   size_t const decompressError = ZSTD_decompressStream(zd, &outBuff, &inBuff);
+                    if (ZSTD_isError(decompressError)) break;   /* error correctly detected */
+    }   }   }   }
+    DISPLAY("\r%u fuzzer tests completed   \n", testNb);
+
+_cleanup:
+    ZSTDMT_freeCCtx(zc);
+    ZSTD_freeDStream(zd);
+    ZSTD_freeDStream(zd_noise);
+    free(cNoiseBuffer[0]);
+    free(cNoiseBuffer[1]);
+    free(cNoiseBuffer[2]);
+    free(cNoiseBuffer[3]);
+    free(cNoiseBuffer[4]);
+    free(copyBuffer);
+    free(cBuffer);
+    free(dstBuffer);
+    return result;
+
+_output_error:
+    result = 1;
+    goto _cleanup;
+}
+
+
 /*-*******************************************************
 *  Command line
 *********************************************************/
@@ -708,20 +947,23 @@ int main(int argc, const char** argv)
                 {
                 case 'h':
                     return FUZ_usage(programName);
+
                 case 'v':
                     argument++;
-                    g_displayLevel=4;
+                    g_displayLevel++;
                     break;
+
                 case 'q':
                     argument++;
                     g_displayLevel--;
                     break;
+
                 case 'p': /* pause at the end */
                     argument++;
                     mainPause = 1;
                     break;
 
-                case 'i':
+                case 'i':   /* limit tests by nb of iterations (default) */
                     argument++;
                     nbTests=0; g_clockTime=0;
                     while ((*argument>='0') && (*argument<='9')) {
@@ -731,7 +973,7 @@ int main(int argc, const char** argv)
                     }
                     break;
 
-                case 'T':
+                case 'T':   /* limit tests by time */
                     argument++;
                     nbTests=0; g_clockTime=0;
                     while ((*argument>='0') && (*argument<='9')) {
@@ -744,7 +986,7 @@ int main(int argc, const char** argv)
                     g_clockTime *= CLOCKS_PER_SEC;
                     break;
 
-                case 's':
+                case 's':   /* manually select seed */
                     argument++;
                     seed=0;
                     seedset=1;
@@ -755,7 +997,7 @@ int main(int argc, const char** argv)
                     }
                     break;
 
-                case 't':
+                case 't':   /* select starting test number */
                     argument++;
                     testNb=0;
                     while ((*argument>='0') && (*argument<='9')) {
@@ -799,12 +1041,12 @@ int main(int argc, const char** argv)
     if (testNb==0) {
         result = basicUnitTests(0, ((double)proba) / 100, customNULL);  /* constant seed for predictability */
         if (!result) {
-            DISPLAYLEVEL(4, "Unit tests using customMem :\n")
+            DISPLAYLEVEL(3, "Unit tests using customMem :\n")
             result = basicUnitTests(0, ((double)proba) / 100, customMem);  /* use custom memory allocation functions */
     }   }
 
-    if (!result)
-        result = fuzzerTests(seed, nbTests, testNb, ((double)proba) / 100);
+    if (!result) result = fuzzerTests(seed, nbTests, testNb, ((double)proba) / 100);
+    if (!result) result = fuzzerTests_MT(seed, nbTests, testNb, ((double)proba) / 100);
 
     if (mainPause) {
         int unused;