]> git.ipfire.org Git - thirdparty/zstd.git/commitdiff
fix #944 : ZSTDMT with large files and dictionary now works correctly
authorYann Collet <cyan@fb.com>
Wed, 13 Dec 2017 00:20:51 +0000 (16:20 -0800)
committerYann Collet <cyan@fb.com>
Wed, 13 Dec 2017 02:04:58 +0000 (18:04 -0800)
windowLog is now enforced from provided compression parameters,
instead of being copied blindly from `cdict`
where it could be smaller.

also :
- fix a minor bug in zstreamtest --mt : advanced parameters must be set before init
- changed advanced parameter name to ZSTDMT_jobSize

lib/compress/zstd_compress.c
lib/compress/zstd_compress_internal.h
lib/compress/zstdmt_compress.c
lib/compress/zstdmt_compress.h
tests/zstreamtest.c

index 1072918885f497f7568c2ec8aa0bce4e66dfa310..7586c3a0c749dee1e4b027c18b33a07de86821be 100644 (file)
@@ -410,7 +410,7 @@ size_t ZSTD_CCtxParam_setParameter(
         return ERROR(parameter_unsupported);
 #else
         if (CCtxParams->nbThreads <= 1) return ERROR(parameter_unsupported);
-        return ZSTDMT_CCtxParam_setMTCtxParameter(CCtxParams, ZSTDMT_p_sectionSize, value);
+        return ZSTDMT_CCtxParam_setMTCtxParameter(CCtxParams, ZSTDMT_p_jobSize, value);
 #endif
 
     case ZSTD_p_overlapSizeLog :
@@ -987,15 +987,16 @@ void ZSTD_invalidateRepCodes(ZSTD_CCtx* cctx) {
 
 /*! ZSTD_copyCCtx_internal() :
  *  Duplicate an existing context `srcCCtx` into another one `dstCCtx`.
- *  The "context", in this case, refers to the hash and chain tables,
- *  entropy tables, and dictionary offsets.
  *  Only works during stage ZSTDcs_init (i.e. after creation, but before first call to ZSTD_compressContinue()).
- *  pledgedSrcSize=0 means "empty".
- *  @return : 0, or an error code */
+ *  The "context", in this case, refers to the hash and chain tables,
+ *  entropy tables, and dictionary references.
+ * `windowLog` value is enforced if != 0, otherwise value is copied from srcCCtx.
+ * @return : 0, or an error code */
 static size_t ZSTD_copyCCtx_internal(ZSTD_CCtx* dstCCtx,
                             const ZSTD_CCtx* srcCCtx,
+                            unsigned windowLog,
                             ZSTD_frameParameters fParams,
-                            unsigned long long pledgedSrcSize,
+                            U64 pledgedSrcSize,
                             ZSTD_buffered_policy_e zbuff)
 {
     DEBUGLOG(5, "ZSTD_copyCCtx_internal");
@@ -1005,6 +1006,7 @@ static size_t ZSTD_copyCCtx_internal(ZSTD_CCtx* dstCCtx,
     {   ZSTD_CCtx_params params = dstCCtx->requestedParams;
         /* Copy only compression parameters related to tables. */
         params.cParams = srcCCtx->appliedParams.cParams;
+        if (windowLog) params.cParams.windowLog = windowLog;
         params.fParams = fParams;
         ZSTD_resetCCtx_internal(dstCCtx, params, pledgedSrcSize,
                                 ZSTDcrp_noMemset, zbuff);
@@ -1050,7 +1052,9 @@ size_t ZSTD_copyCCtx(ZSTD_CCtx* dstCCtx, const ZSTD_CCtx* srcCCtx, unsigned long
     if (pledgedSrcSize==0) pledgedSrcSize = ZSTD_CONTENTSIZE_UNKNOWN;
     fParams.contentSizeFlag = (pledgedSrcSize != ZSTD_CONTENTSIZE_UNKNOWN);
 
-    return ZSTD_copyCCtx_internal(dstCCtx, srcCCtx, fParams, pledgedSrcSize, zbuff);
+    return ZSTD_copyCCtx_internal(dstCCtx, srcCCtx,
+                                0 /*windowLog from srcCCtx*/, fParams, pledgedSrcSize,
+                                zbuff);
 }
 
 
@@ -2037,12 +2041,12 @@ static size_t ZSTD_compress_insertDictionary(ZSTD_CCtx* cctx,
 
 /*! ZSTD_compressBegin_internal() :
  * @return : 0, or an error code */
-static size_t ZSTD_compressBegin_internal(ZSTD_CCtx* cctx,
+size_t ZSTD_compressBegin_internal(ZSTD_CCtx* cctx,
                              const void* dict, size_t dictSize,
                              ZSTD_dictMode_e dictMode,
                              const ZSTD_CDict* cdict,
-                                   ZSTD_CCtx_params params, U64 pledgedSrcSize,
-                                   ZSTD_buffered_policy_e zbuff)
+                             ZSTD_CCtx_params params, U64 pledgedSrcSize,
+                             ZSTD_buffered_policy_e zbuff)
 {
     DEBUGLOG(4, "ZSTD_compressBegin_internal");
     /* params are supposed to be fully validated at this point */
@@ -2052,7 +2056,7 @@ static size_t ZSTD_compressBegin_internal(ZSTD_CCtx* cctx,
     if (cdict && cdict->dictContentSize>0) {
         cctx->requestedParams = params;
         return ZSTD_copyCCtx_internal(cctx, cdict->refContext,
-                                      params.fParams, pledgedSrcSize,
+                                      params.cParams.windowLog, params.fParams, pledgedSrcSize,
                                       zbuff);
     }
 
@@ -2061,17 +2065,19 @@ static size_t ZSTD_compressBegin_internal(ZSTD_CCtx* cctx,
     return ZSTD_compress_insertDictionary(cctx, dict, dictSize, dictMode);
 }
 
-size_t ZSTD_compressBegin_advanced_internal(
-                                    ZSTD_CCtx* cctx,
+size_t ZSTD_compressBegin_advanced_internal(ZSTD_CCtx* cctx,
                                     const void* dict, size_t dictSize,
                                     ZSTD_dictMode_e dictMode,
+                                    const ZSTD_CDict* cdict,
                                     ZSTD_CCtx_params params,
                                     unsigned long long pledgedSrcSize)
 {
     DEBUGLOG(4, "ZSTD_compressBegin_advanced_internal");
     /* compression parameters verification and optimization */
     CHECK_F( ZSTD_checkCParams(params.cParams) );
-    return ZSTD_compressBegin_internal(cctx, dict, dictSize, dictMode, NULL,
+    return ZSTD_compressBegin_internal(cctx,
+                                       dict, dictSize, dictMode,
+                                       cdict,
                                        params, pledgedSrcSize,
                                        ZSTDb_not_buffered);
 }
@@ -2084,9 +2090,10 @@ size_t ZSTD_compressBegin_advanced(ZSTD_CCtx* cctx,
 {
     ZSTD_CCtx_params const cctxParams =
             ZSTD_assignParamsToCCtxParams(cctx->requestedParams, params);
-    return ZSTD_compressBegin_advanced_internal(cctx, dict, dictSize, ZSTD_dm_auto,
-                                                cctxParams,
-                                                pledgedSrcSize);
+    return ZSTD_compressBegin_advanced_internal(cctx,
+                                            dict, dictSize, ZSTD_dm_auto,
+                                            NULL /*cdict*/,
+                                            cctxParams, pledgedSrcSize);
 }
 
 size_t ZSTD_compressBegin_usingDict(ZSTD_CCtx* cctx, const void* dict, size_t dictSize, int compressionLevel)
@@ -2507,10 +2514,10 @@ static size_t ZSTD_resetCStream_internal(ZSTD_CStream* zcs,
     assert(!((dict) && (cdict)));  /* either dict or cdict, not both */
 
     CHECK_F( ZSTD_compressBegin_internal(zcs,
-                                        dict, dictSize, dictMode,
-                                        cdict,
-                                        params, pledgedSrcSize,
-                                        ZSTDb_buffered) );
+                                         dict, dictSize, dictMode,
+                                         cdict,
+                                         params, pledgedSrcSize,
+                                         ZSTDb_buffered) );
 
     zcs->inToCompress = 0;
     zcs->inBuffPos = 0;
@@ -2534,7 +2541,7 @@ size_t ZSTD_resetCStream(ZSTD_CStream* zcs, unsigned long long pledgedSrcSize)
 }
 
 /*! ZSTD_initCStream_internal() :
- *  Note : not static, but hidden (not exposed). Used by zstdmt_compress.c
+ *  Note : for lib/compress only. Used by zstdmt_compress.c.
  *  Assumption 1 : params are valid
  *  Assumption 2 : either dict, or cdict, is defined, not both */
 size_t ZSTD_initCStream_internal(ZSTD_CStream* zcs,
@@ -2546,7 +2553,7 @@ size_t ZSTD_initCStream_internal(ZSTD_CStream* zcs,
     assert(!((dict) && (cdict)));  /* either dict or cdict, not both */
 
     if (dict && dictSize >= 8) {
-        DEBUGLOG(5, "loading dictionary of size %u", (U32)dictSize);
+        DEBUGLOG(4, "loading dictionary of size %u", (U32)dictSize);
         if (zcs->staticSize) {   /* static CCtx : never uses malloc */
             /* incompatible with internal cdict creation */
             return ERROR(memory_allocation);
@@ -2559,14 +2566,14 @@ size_t ZSTD_initCStream_internal(ZSTD_CStream* zcs,
         if (zcs->cdictLocal == NULL) return ERROR(memory_allocation);
     } else {
         if (cdict) {
-            params.cParams = ZSTD_getCParamsFromCDict(cdict);  /* cParams are enforced from cdict */
+            params.cParams = ZSTD_getCParamsFromCDict(cdict);  /* cParams are enforced from cdict; this includes windowLog (should it be enforced from `params` instead ?) */
         }
         ZSTD_freeCDict(zcs->cdictLocal);
         zcs->cdictLocal = NULL;
         zcs->cdict = cdict;
     }
 
-    params.compressionLevel = ZSTD_CLEVEL_CUSTOM;
+    params.compressionLevel = ZSTD_CLEVEL_CUSTOM; /* enforce usage of cParams, instead of a dynamic derivation from cLevel (but does that happen ?) */
     zcs->requestedParams = params;
 
     return ZSTD_resetCStream_internal(zcs, NULL, 0, ZSTD_dm_auto, zcs->cdict, params, pledgedSrcSize);
@@ -2606,10 +2613,9 @@ size_t ZSTD_initCStream_advanced(ZSTD_CStream* zcs,
                                  const void* dict, size_t dictSize,
                                  ZSTD_parameters params, unsigned long long pledgedSrcSize)
 {
-    ZSTD_CCtx_params const cctxParams =
-            ZSTD_assignParamsToCCtxParams(zcs->requestedParams, params);
+    ZSTD_CCtx_params const cctxParams = ZSTD_assignParamsToCCtxParams(zcs->requestedParams, params);
     DEBUGLOG(4, "ZSTD_initCStream_advanced: pledgedSrcSize=%u, flag=%u",
-            (U32)pledgedSrcSize, params.fParams.contentSizeFlag);
+                (U32)pledgedSrcSize, params.fParams.contentSizeFlag);
     CHECK_F( ZSTD_checkCParams(params.cParams) );
     if ((pledgedSrcSize==0) && (params.fParams.contentSizeFlag==0)) pledgedSrcSize = ZSTD_CONTENTSIZE_UNKNOWN;  /* for compatibility with older programs relying on this behavior. Users should now specify ZSTD_CONTENTSIZE_UNKNOWN. This line will be removed in the future. */
     return ZSTD_initCStream_internal(zcs, dict, dictSize, NULL /*cdict*/, cctxParams, pledgedSrcSize);
index 278f2427c59e00ae8cc0882ae3c9be6f8e3a744c..f104fe981ea53c53f690ee8ea8213c2ba0d0c4eb 100644 (file)
@@ -447,6 +447,7 @@ ZSTD_compressionParameters ZSTD_getCParamsFromCDict(const ZSTD_CDict* cdict);
 size_t ZSTD_compressBegin_advanced_internal(ZSTD_CCtx* cctx,
                                     const void* dict, size_t dictSize,
                                     ZSTD_dictMode_e dictMode,
+                                    const ZSTD_CDict* cdict,
                                     ZSTD_CCtx_params params,
                                     unsigned long long pledgedSrcSize);
 
index 8347cf29d5c078f83040c374e177d404f43d13e9..a5e996d3ebf51e2ffcb64d55a297d9a3011a07ba 100644 (file)
@@ -350,29 +350,35 @@ void ZSTDMT_compressChunk(void* jobDescription)
             goto _endJob;
         }
         job->dstBuff = dstBuff;
-        DEBUGLOG(5, "ZSTDMT_compressChunk: allocated dstBuff of size %u", (U32)dstBuff.size);
+        DEBUGLOG(5, "ZSTDMT_compressChunk: received dstBuff of size %u", (U32)dstBuff.size);
     }
 
     if (job->cdict) {
-        size_t const initError = ZSTD_compressBegin_usingCDict_advanced(cctx, job->cdict, job->params.fParams, job->fullFrameSize);
+        size_t const initError = ZSTD_compressBegin_advanced_internal(cctx, NULL, 0, ZSTD_dm_auto, job->cdict, job->params, job->fullFrameSize);
         DEBUGLOG(4, "ZSTDMT_compressChunk: init using CDict");
         assert(job->firstChunk);  /* only allowed for first job */
         if (ZSTD_isError(initError)) { job->cSize = initError; goto _endJob; }
     } else {  /* srcStart points at reloaded section */
+        U64 const pledgedSrcSize = job->firstChunk ? job->fullFrameSize : ZSTD_CONTENTSIZE_UNKNOWN;
         ZSTD_CCtx_params jobParams = job->params;   /* do not modify job->params ! copy it, modify the copy */
         size_t const forceWindowError = ZSTD_CCtxParam_setParameter(&jobParams, ZSTD_p_forceMaxWindow, !job->firstChunk);
-        U64 const pledgedSrcSize = job->firstChunk ? job->fullFrameSize : ZSTD_CONTENTSIZE_UNKNOWN;
-        /* load dictionary in "content-only" mode (no header analysis) */
-        size_t const initError = ZSTD_compressBegin_advanced_internal(cctx, job->srcStart, job->prefixSize, ZSTD_dm_rawContent, jobParams, pledgedSrcSize);
-        DEBUGLOG(5, "ZSTD_compressBegin_advanced_internal called with windowLog = %u ", jobParams.cParams.windowLog);
-        if (ZSTD_isError(initError) || ZSTD_isError(forceWindowError)) {
-            job->cSize = initError;
+        if (ZSTD_isError(forceWindowError)) {
+            DEBUGLOG(5, "ZSTD_CCtxParam_setParameter error : %s ", ZSTD_getErrorName(forceWindowError));
+            job->cSize = forceWindowError;
             goto _endJob;
         }
+        /* load dictionary in "content-only" mode (no header analysis) */
+        {   size_t const initError = ZSTD_compressBegin_advanced_internal(cctx, job->srcStart, job->prefixSize, ZSTD_dm_rawContent, NULL, jobParams, pledgedSrcSize);
+            DEBUGLOG(5, "ZSTD_compressBegin_advanced_internal called with windowLog = %u ", jobParams.cParams.windowLog);
+            if (ZSTD_isError(initError)) {
+                DEBUGLOG(5, "ZSTD_compressBegin_advanced_internal error : %s ", ZSTD_getErrorName(initError));
+                job->cSize = initError;
+                goto _endJob;
+        }   }
     }
     if (!job->firstChunk) {  /* flush and overwrite frame header when it's not first job */
         size_t const hSize = ZSTD_compressContinue(cctx, dstBuff.start, dstBuff.size, src, 0);
-        if (ZSTD_isError(hSize)) { job->cSize = hSize; goto _endJob; }
+        if (ZSTD_isError(hSize)) { job->cSize = hSize; /* save error code */ goto _endJob; }
         ZSTD_invalidateRepCodes(cctx);
     }
 
@@ -559,11 +565,13 @@ size_t ZSTDMT_sizeof_CCtx(ZSTDMT_CCtx* mtctx)
 }
 
 /* Internal only */
-size_t ZSTDMT_CCtxParam_setMTCtxParameter(
-    ZSTD_CCtx_params* params, ZSTDMT_parameter parameter, unsigned value) {
+size_t ZSTDMT_CCtxParam_setMTCtxParameter(ZSTD_CCtx_params* params,
+                                ZSTDMT_parameter parameter, unsigned value) {
+    DEBUGLOG(4, "ZSTDMT_CCtxParam_setMTCtxParameter");
     switch(parameter)
     {
-    case ZSTDMT_p_sectionSize :
+    case ZSTDMT_p_jobSize :
+        DEBUGLOG(4, "ZSTDMT_CCtxParam_setMTCtxParameter : set jobSize to %u", value);
         if ( (value > 0)  /* value==0 => automatic job size */
            & (value < ZSTDMT_JOBSIZE_MIN) )
             value = ZSTDMT_JOBSIZE_MIN;
@@ -581,9 +589,10 @@ size_t ZSTDMT_CCtxParam_setMTCtxParameter(
 
 size_t ZSTDMT_setMTCtxParameter(ZSTDMT_CCtx* mtctx, ZSTDMT_parameter parameter, unsigned value)
 {
+    DEBUGLOG(4, "ZSTDMT_setMTCtxParameter");
     switch(parameter)
     {
-    case ZSTDMT_p_sectionSize :
+    case ZSTDMT_p_jobSize :
         return ZSTDMT_CCtxParam_setMTCtxParameter(&mtctx->params, parameter, value);
     case ZSTDMT_p_overlapSectionLog :
         return ZSTDMT_CCtxParam_setMTCtxParameter(&mtctx->params, parameter, value);
@@ -817,7 +826,7 @@ size_t ZSTDMT_initCStream_internal(
     zcs->targetSectionSize = params.jobSize ? params.jobSize : (size_t)1 << (params.cParams.windowLog + 2);
     if (zcs->targetSectionSize < ZSTDMT_JOBSIZE_MIN) zcs->targetSectionSize = ZSTDMT_JOBSIZE_MIN;
     if (zcs->targetSectionSize < zcs->targetDictSize) zcs->targetSectionSize = zcs->targetDictSize;  /* job size must be >= overlap size */
-    DEBUGLOG(4, "Job Size : %u KB", (U32)(zcs->targetSectionSize>>10));
+    DEBUGLOG(4, "Job Size : %u KB (note : set to %u)", (U32)(zcs->targetSectionSize>>10), params.jobSize);
     zcs->inBuffSize = zcs->targetDictSize + zcs->targetSectionSize;
     DEBUGLOG(4, "inBuff Size : %u KB", (U32)(zcs->inBuffSize>>10));
     ZSTDMT_setBufferSize(zcs->bufPool, MAX(zcs->inBuffSize, ZSTD_compressBound(zcs->targetSectionSize)) );
@@ -955,6 +964,7 @@ static size_t ZSTDMT_createCompressionJob(ZSTDMT_CCtx* zcs, size_t srcSize, unsi
 static size_t ZSTDMT_flushNextJob(ZSTDMT_CCtx* zcs, ZSTD_outBuffer* output, unsigned blockToFlush)
 {
     unsigned const wJobID = zcs->doneJobID & zcs->jobIDMask;
+    DEBUGLOG(5, "ZSTDMT_flushNextJob");
     if (zcs->doneJobID == zcs->nextJobID) return 0;   /* all flushed ! */
     ZSTD_PTHREAD_MUTEX_LOCK(&zcs->jobCompleted_mutex);
     while (zcs->jobs[wJobID].jobCompleted==0) {
@@ -967,7 +977,8 @@ static size_t ZSTDMT_flushNextJob(ZSTDMT_CCtx* zcs, ZSTD_outBuffer* output, unsi
     {   ZSTDMT_jobDescription job = zcs->jobs[wJobID];
         if (!job.jobScanned) {
             if (ZSTD_isError(job.cSize)) {
-                DEBUGLOG(5, "compression error detected ");
+                DEBUGLOG(5, "job %u : compression error detected : %s",
+                            zcs->doneJobID, ZSTD_getErrorName(job.cSize));
                 ZSTDMT_waitForAllJobsCompleted(zcs);
                 ZSTDMT_releaseAllJobResources(zcs);
                 return job.cSize;
@@ -1099,9 +1110,11 @@ size_t ZSTDMT_compressStream(ZSTDMT_CCtx* zcs, ZSTD_outBuffer* output, ZSTD_inBu
 static size_t ZSTDMT_flushStream_internal(ZSTDMT_CCtx* mtctx, ZSTD_outBuffer* output, unsigned endFrame)
 {
     size_t const srcSize = mtctx->inBuff.filled - mtctx->dictSize;
+    DEBUGLOG(5, "ZSTDMT_flushStream_internal");
 
     if ( ((srcSize > 0) || (endFrame && !mtctx->frameEnded))
        && (mtctx->nextJobID <= mtctx->doneJobID + mtctx->jobIDMask) ) {
+           DEBUGLOG(5, "ZSTDMT_flushStream_internal : create a new job");
         CHECK_F( ZSTDMT_createCompressionJob(mtctx, srcSize, endFrame) );
     }
 
index 269c54b1ac8fdb668f5d8cdc4de66e2bd6a161a5..4209cf3c5ac283b470c1ddcd95c13294e5b63339 100644 (file)
@@ -84,13 +84,13 @@ ZSTDLIB_API size_t ZSTDMT_initCStream_usingCDict(ZSTDMT_CCtx* mtctx,
 /* ZSTDMT_parameter :
  * List of parameters that can be set using ZSTDMT_setMTCtxParameter() */
 typedef enum {
-    ZSTDMT_p_sectionSize,        /* size of input "section". Each section is compressed in parallel. 0 means default, which is dynamically determined within compression functions */
-    ZSTDMT_p_overlapSectionLog   /* Log of overlapped section; 0 == no overlap, 6(default) == use 1/8th of window, >=9 == use full window */
+    ZSTDMT_p_jobSize,           /* Each job is compressed in parallel. By default, this value is dynamically determined depending on compression parameters. Can be set explicitly here. */
+    ZSTDMT_p_overlapSectionLog  /* Each job may reload a part of previous job to enhance compressionr ratio; 0 == no overlap, 6(default) == use 1/8th of window, >=9 == use full window */
 } ZSTDMT_parameter;
 
 /* ZSTDMT_setMTCtxParameter() :
  * allow setting individual parameters, one at a time, among a list of enums defined in ZSTDMT_parameter.
- * The function must be called typically after ZSTD_createCCtx().
+ * The function must be called typically after ZSTD_createCCtx() but __before ZSTDMT_init*() !__
  * Parameters not explicitly reset by ZSTDMT_init*() remain the same in consecutive compression sessions.
  * @return : 0, or an error code (which can be tested using ZSTD_isError()) */
 ZSTDLIB_API size_t ZSTDMT_setMTCtxParameter(ZSTDMT_CCtx* mtctx, ZSTDMT_parameter parameter, unsigned value);
@@ -114,7 +114,7 @@ size_t ZSTDMT_CCtxParam_setMTCtxParameter(ZSTD_CCtx_params* params, ZSTDMT_param
 
 /* ZSTDMT_CCtxParam_setNbThreads()
  * Set nbThreads, and clamp it correctly,
- * but also reset jobSize and overlapLog */ 
+ * but also reset jobSize and overlapLog */
 size_t ZSTDMT_CCtxParam_setNbThreads(ZSTD_CCtx_params* params, unsigned nbThreads);
 
 /*! ZSTDMT_initCStream_internal() :
index 6910457ff63da8a6f52eb3c839f05c0b43b6202f..67ef6ce802fa22bac74f7e61656fca82e74687fa 100644 (file)
@@ -1263,15 +1263,15 @@ static int fuzzerTests_MT(U32 seed, U32 nbTests, unsigned startTest, double comp
             }
             {   U64 const pledgedSrcSize = (FUZ_rand(&lseed) & 3) ? ZSTD_CONTENTSIZE_UNKNOWN : maxTestSize;
                 ZSTD_parameters params = ZSTD_getParams(cLevel, pledgedSrcSize, dictSize);
-                DISPLAYLEVEL(5, "Init with windowLog = %u and pledgedSrcSize = %u \n",
-                    params.cParams.windowLog, (U32)pledgedSrcSize);
+                DISPLAYLEVEL(5, "Init with windowLog = %u, pledgedSrcSize = %u, dictSize = %u \n",
+                    params.cParams.windowLog, (U32)pledgedSrcSize, (U32)dictSize);
                 params.fParams.checksumFlag = FUZ_rand(&lseed) & 1;
                 params.fParams.noDictIDFlag = FUZ_rand(&lseed) & 1;
                 params.fParams.contentSizeFlag = FUZ_rand(&lseed) & 1;
                 DISPLAYLEVEL(5, "checksumFlag : %u \n", params.fParams.checksumFlag);
-                CHECK_Z( ZSTDMT_initCStream_advanced(zc, dict, dictSize, params, pledgedSrcSize) );
                 CHECK_Z( ZSTDMT_setMTCtxParameter(zc, ZSTDMT_p_overlapSectionLog, FUZ_rand(&lseed) % 12) );
-                CHECK_Z( ZSTDMT_setMTCtxParameter(zc, ZSTDMT_p_sectionSize, FUZ_rand(&lseed) % (2*maxTestSize+1)) );
+                CHECK_Z( ZSTDMT_setMTCtxParameter(zc, ZSTDMT_p_jobSize, FUZ_rand(&lseed) % (2*maxTestSize+1)) );   /* custome job size */
+                CHECK_Z( ZSTDMT_initCStream_advanced(zc, dict, dictSize, params, pledgedSrcSize) );
         }   }
 
         /* multi-segments compression test */
@@ -1288,9 +1288,9 @@ static int fuzzerTests_MT(U32 seed, U32 nbTests, unsigned startTest, double comp
                     ZSTD_inBuffer inBuff = { srcBuffer+srcStart, srcSize, 0 };
                     outBuff.size = outBuff.pos + dstBuffSize;
 
-                    DISPLAYLEVEL(5, "Sending %u bytes to compress \n", (U32)srcSize);
+                    DISPLAYLEVEL(6, "Sending %u bytes to compress \n", (U32)srcSize);
                     CHECK_Z( ZSTDMT_compressStream(zc, &outBuff, &inBuff) );
-                    DISPLAYLEVEL(5, "%u bytes read by ZSTDMT_compressStream \n", (U32)inBuff.pos);
+                    DISPLAYLEVEL(6, "%u bytes read by ZSTDMT_compressStream \n", (U32)inBuff.pos);
 
                     XXH64_update(&xxhState, srcBuffer+srcStart, inBuff.pos);
                     memcpy(copyBuffer+totalTestSize, srcBuffer+srcStart, inBuff.pos);
@@ -1337,10 +1337,10 @@ static int fuzzerTests_MT(U32 seed, U32 nbTests, unsigned startTest, double comp
                 size_t const dstBuffSize = MIN(dstBufferSize - totalGenSize, randomDstSize);
                 inBuff.size = inBuff.pos + readCSrcSize;
                 outBuff.size = outBuff.pos + dstBuffSize;
-                DISPLAYLEVEL(5, "ZSTD_decompressStream input %u bytes \n", (U32)readCSrcSize);
+                DISPLAYLEVEL(6, "ZSTD_decompressStream input %u bytes \n", (U32)readCSrcSize);
                 decompressionResult = ZSTD_decompressStream(zd, &outBuff, &inBuff);
                 CHECK (ZSTD_isError(decompressionResult), "decompression error : %s", ZSTD_getErrorName(decompressionResult));
-                DISPLAYLEVEL(5, "inBuff.pos = %u \n", (U32)readCSrcSize);
+                DISPLAYLEVEL(6, "inBuff.pos = %u \n", (U32)readCSrcSize);
             }
             CHECK (outBuff.pos != totalTestSize, "decompressed data : wrong size (%u != %u)", (U32)outBuff.pos, (U32)totalTestSize);
             CHECK (inBuff.pos != cSize, "compressed data should be fully read (%u != %u)", (U32)inBuff.pos, (U32)cSize);