]> git.ipfire.org Git - thirdparty/zstd.git/commitdiff
Changed nbThreads for nbWorkers
authorYann Collet <cyan@fb.com>
Fri, 2 Feb 2018 03:29:30 +0000 (19:29 -0800)
committerYann Collet <cyan@fb.com>
Fri, 2 Feb 2018 03:29:30 +0000 (19:29 -0800)
This makes it easier to explain that nbWorkers=0 --> single-threaded mode,
while nbWorkers=1 --> asynchronous mode (one mode thread on top of the "main" caller thread).
No need for an additional asynchronous mode flag.
nbWorkers>=2 works the same as nbThreads>=2 previously.

15 files changed:
doc/zstd_manual.html
lib/compress/zstd_compress.c
lib/compress/zstd_compress_internal.h
lib/compress/zstdmt_compress.c
lib/compress/zstdmt_compress.h
lib/zstd.h
programs/bench.c
programs/bench.h
programs/fileio.c
programs/fileio.h
programs/zstdcli.c
tests/fullbench.c
tests/fuzzer.c
tests/roundTripCrash.c
tests/zstreamtest.c

index 1dea249d48489db85457906b0597ef98759938de..748568237bb2bb4894aab6c76ff5d98d60a3255c 100644 (file)
@@ -416,7 +416,7 @@ size_t ZSTD_estimateDCtxSize(void);
   It will also consider src size to be arbitrarily "large", which is worst case.
   If srcSize is known to always be small, ZSTD_estimateCCtxSize_usingCParams() can provide a tighter estimation.
   ZSTD_estimateCCtxSize_usingCParams() can be used in tandem with ZSTD_getCParams() to create cParams from compressionLevel.
-  ZSTD_estimateCCtxSize_usingCCtxParams() can be used in tandem with ZSTD_CCtxParam_setParameter(). Only single-threaded compression is supported. This function will return an error code if ZSTD_p_nbThreads is > 1.
+  ZSTD_estimateCCtxSize_usingCCtxParams() can be used in tandem with ZSTD_CCtxParam_setParameter(). Only single-threaded compression is supported. This function will return an error code if ZSTD_p_nbWorkers is >= 1.
   Note : CCtx size estimation is only correct for single-threaded compression. 
 </p></pre><BR>
 
@@ -429,7 +429,7 @@ size_t ZSTD_estimateDStreamSize_fromFrame(const void* src, size_t srcSize);
   It will also consider src size to be arbitrarily "large", which is worst case.
   If srcSize is known to always be small, ZSTD_estimateCStreamSize_usingCParams() can provide a tighter estimation.
   ZSTD_estimateCStreamSize_usingCParams() can be used in tandem with ZSTD_getCParams() to create cParams from compressionLevel.
-  ZSTD_estimateCStreamSize_usingCCtxParams() can be used in tandem with ZSTD_CCtxParam_setParameter(). Only single-threaded compression is supported. This function will return an error code if ZSTD_p_nbThreads is set to a value > 1.
+  ZSTD_estimateCStreamSize_usingCCtxParams() can be used in tandem with ZSTD_CCtxParam_setParameter(). Only single-threaded compression is supported. This function will return an error code if ZSTD_p_nbWorkers is >= 1.
   Note : CStream size estimation is only correct for single-threaded compression.
   ZSTD_DStream memory budget depends on window Size.
   This information can be passed manually, using ZSTD_estimateDStreamSize,
@@ -800,18 +800,11 @@ size_t ZSTD_decodingBufferSize_min(unsigned long long windowSize, unsigned long
     </b>/* multi-threading parameters */<b>
     </b>/* These parameters are only useful if multi-threading is enabled (ZSTD_MULTITHREAD).<b>
      * They return an error otherwise. */
-    ZSTD_p_nbThreads=400,    </b>/* Select how many threads a compression job can spawn (default:1)<b>
+    ZSTD_p_nbWorkers=400,    </b>/* Select how many threads will be spawned to compress in parallel.<b>
+                              * Triggers asynchronous mode, even with nbWorkers = 1.
+                              * Can only be set to a value >= 1 if ZSTD_MULTITHREAD is enabled.
                               * More threads improve speed, but also increase memory usage.
-                              * Can only receive a value > 1 if ZSTD_MULTITHREAD is enabled.
-                              * Special: value 0 means "do not change nbThreads" */
-    ZSTD_p_nonBlockingMode,  </b>/* Single thread mode is by default "blocking" :<b>
-                              * it finishes its job as much as possible, and only then gives back control to caller.
-                              * In contrast, multi-thread is by default "non-blocking" :
-                              * it takes some input, flush some output if available, and immediately gives back control to caller.
-                              * Compression work is performed in parallel, within worker threads.
-                              * (note : a strong exception to this rule is when first job is called with ZSTD_e_end : it becomes blocking)
-                              * Setting this parameter to 1 will enforce non-blocking mode even when only 1 thread is selected.
-                              * It allows the caller to do other tasks while the worker thread compresses in parallel. */
+                              * Default value is `0`, aka "blocking mode" : no worker is spawned, compression is performed inside Caller's thread */
     ZSTD_p_jobSize,          </b>/* Size of a compression job. This value is only enforced in streaming (non-blocking) mode.<b>
                               * Each compression job is completed in parallel, so indirectly controls the nb of active threads.
                               * 0 means default, which is dynamically determined based on compression parameters.
@@ -823,7 +816,7 @@ size_t ZSTD_decodingBufferSize_min(unsigned long long windowSize, unsigned long
     </b>/* advanced parameters - may not remain available after API update */<b>
     ZSTD_p_forceMaxWindow=1100, </b>/* Force back-reference distances to remain < windowSize,<b>
                               * even when referencing into Dictionary content (default:0) */
-    ZSTD_p_enableLongDistanceMatching=1200,  </b>/* Enable long distance matching.<b>
+    ZSTD_p_enableLongDistanceMatching=1200, </b>/* Enable long distance matching.<b>
                                          * This parameter is designed to improve the compression
                                          * ratio for large inputs with long distance matches.
                                          * This increases the memory usage as well as window size.
@@ -833,25 +826,29 @@ size_t ZSTD_decodingBufferSize_min(unsigned long long windowSize, unsigned long
                                          * other LDM parameters. Setting the compression level
                                          * after this parameter overrides the window log, though LDM
                                          * will remain enabled until explicitly disabled. */
-    ZSTD_p_ldmHashLog,   </b>/* Size of the table for long distance matching, as a power of 2.<b>
-                          * Larger values increase memory usage and compression ratio, but decrease
-                          * compression speed.
-                          * Must be clamped between ZSTD_HASHLOG_MIN and ZSTD_HASHLOG_MAX
-                          * (default: windowlog - 7). */
-    ZSTD_p_ldmMinMatch,  </b>/* Minimum size of searched matches for long distance matcher.<b>
-                          * Larger/too small values usually decrease compression ratio.
-                          * Must be clamped between ZSTD_LDM_MINMATCH_MIN
-                          * and ZSTD_LDM_MINMATCH_MAX (default: 64). */
-    ZSTD_p_ldmBucketSizeLog,  </b>/* Log size of each bucket in the LDM hash table for collision resolution.<b>
-                               * Larger values usually improve collision resolution but may decrease
-                               * compression speed.
-                               * The maximum value is ZSTD_LDM_BUCKETSIZELOG_MAX (default: 3). */
+    ZSTD_p_ldmHashLog,       </b>/* Size of the table for long distance matching, as a power of 2.<b>
+                              * Larger values increase memory usage and compression ratio, but decrease
+                              * compression speed.
+                              * Must be clamped between ZSTD_HASHLOG_MIN and ZSTD_HASHLOG_MAX
+                              * (default: windowlog - 7).
+                              * Special: value 0 means "do not change ldmHashLog". */
+    ZSTD_p_ldmMinMatch,      </b>/* Minimum size of searched matches for long distance matcher.<b>
+                              * Larger/too small values usually decrease compression ratio.
+                              * Must be clamped between ZSTD_LDM_MINMATCH_MIN
+                              * and ZSTD_LDM_MINMATCH_MAX (default: 64).
+                              * Special: value 0 means "do not change ldmMinMatch". */
+    ZSTD_p_ldmBucketSizeLog, </b>/* Log size of each bucket in the LDM hash table for collision resolution.<b>
+                              * Larger values usually improve collision resolution but may decrease
+                              * compression speed.
+                              * The maximum value is ZSTD_LDM_BUCKETSIZELOG_MAX (default: 3).
+                              * note : 0 is a valid value */
     ZSTD_p_ldmHashEveryLog,  </b>/* Frequency of inserting/looking up entries in the LDM hash table.<b>
                               * The default is MAX(0, (windowLog - ldmHashLog)) to
                               * optimize hash table usage.
                               * Larger values improve compression speed. Deviating far from the
                               * default value will likely result in a decrease in compression ratio.
-                              * Must be clamped between 0 and ZSTD_WINDOWLOG_MAX - ZSTD_HASHLOG_MIN. */
+                              * Must be clamped between 0 and ZSTD_WINDOWLOG_MAX - ZSTD_HASHLOG_MIN.
+                              * note : 0 is a valid value */
 
 } ZSTD_cParameter;
 </b></pre><BR>
@@ -1000,7 +997,7 @@ size_t ZSTD_CCtx_refPrefix_advanced(ZSTD_CCtx* cctx, const void* prefix, size_t
 </p></pre><BR>
 
 <pre><b>size_t ZSTD_resetCCtxParams(ZSTD_CCtx_params* params);
-</b><p>  Reset params to default, with the default compression level.
+</b><p>  Reset params to default values.
  
 </p></pre><BR>
 
@@ -1030,7 +1027,7 @@ size_t ZSTD_CCtx_refPrefix_advanced(ZSTD_CCtx* cctx, const void* prefix, size_t
 </b><p>  Apply a set of ZSTD_CCtx_params to the compression context.
   This must be done before the dictionary is loaded.
   The pledgedSrcSize is treated as unknown.
-  Multithreading parameters are applied only if nbThreads > 1.
+  Multithreading parameters are applied only if nbWorkers >= 1.
  
 </p></pre><BR>
 
index 18da77f8de207658160ce10809fd72fadee82226..e66b4df8a4a36113d7643f6375a18d30329ae95d 100644 (file)
@@ -281,13 +281,12 @@ size_t ZSTD_CCtx_setParameter(ZSTD_CCtx* cctx, ZSTD_cParameter param, unsigned v
                                    * default : 0 when using a CDict, 1 when using a Prefix */
         return ZSTD_CCtxParam_setParameter(&cctx->requestedParams, param, value);
 
-    case ZSTD_p_nbThreads:
-        if ((value > 1) && cctx->staticSize) {
+    case ZSTD_p_nbWorkers:
+        if ((value>0) && cctx->staticSize) {
             return ERROR(parameter_unsupported);  /* MT not compatible with static alloc */
         }
         return ZSTD_CCtxParam_setParameter(&cctx->requestedParams, param, value);
 
-    case ZSTD_p_nonBlockingMode:
     case ZSTD_p_jobSize:
     case ZSTD_p_overlapSizeLog:
         return ZSTD_CCtxParam_setParameter(&cctx->requestedParams, param, value);
@@ -403,21 +402,12 @@ size_t ZSTD_CCtxParam_setParameter(
         CCtxParams->forceWindow = (value > 0);
         return CCtxParams->forceWindow;
 
-    case ZSTD_p_nbThreads :
-        if (value == 0) return CCtxParams->nbThreads;
+    case ZSTD_p_nbWorkers :
 #ifndef ZSTD_MULTITHREAD
-        if (value > 1) return ERROR(parameter_unsupported);
-        return 1;
-#else
-        return ZSTDMT_CCtxParam_setNbThreads(CCtxParams, value);
-#endif
-
-    case ZSTD_p_nonBlockingMode :
-#ifndef ZSTD_MULTITHREAD
-        return ERROR(parameter_unsupported);
+        if (value > 0) return ERROR(parameter_unsupported);
+        return 0;
 #else
-        CCtxParams->nonBlockingMode = (value>0);
-        return CCtxParams->nonBlockingMode;
+        return ZSTDMT_CCtxParam_setNbWorkers(CCtxParams, value);
 #endif
 
     case ZSTD_p_jobSize :
@@ -489,7 +479,7 @@ size_t ZSTD_CCtx_setParametersUsingCCtxParams(
     cctx->requestedParams = *params;
 #ifdef ZSTD_MULTITHREAD
     if (cctx->mtctx)
-        ZSTDMT_MTCtx_setParametersUsingCCtxParams(cctx->mtctx, params);
+        ZSTDMT_MTCtx_setParametersUsingCCtxParams_whileCompressing(cctx->mtctx, params);
 #endif
 
     return 0;
@@ -687,7 +677,7 @@ static size_t ZSTD_sizeof_matchState(ZSTD_compressionParameters const* cParams,
 size_t ZSTD_estimateCCtxSize_usingCCtxParams(const ZSTD_CCtx_params* params)
 {
     /* Estimate CCtx size is supported for single-threaded compression only. */
-    if (params->nbThreads > 1) { return ERROR(GENERIC); }
+    if (params->nbWorkers > 0) { return ERROR(GENERIC); }
     {   ZSTD_compressionParameters const cParams =
                 ZSTD_getCParamsFromCCtxParams(*params, 0, 0);
         size_t const blockSize = MIN(ZSTD_BLOCKSIZE_MAX, (size_t)1 << cParams.windowLog);
@@ -736,7 +726,7 @@ size_t ZSTD_estimateCCtxSize(int compressionLevel)
 
 size_t ZSTD_estimateCStreamSize_usingCCtxParams(const ZSTD_CCtx_params* params)
 {
-    if (params->nbThreads > 1) { return ERROR(GENERIC); }
+    if (params->nbWorkers > 0) { return ERROR(GENERIC); }
     {   size_t const CCtxSize = ZSTD_estimateCCtxSize_usingCCtxParams(params);
         size_t const blockSize = MIN(ZSTD_BLOCKSIZE_MAX, (size_t)1 << params->cParams.windowLog);
         size_t const inBuffSize = ((size_t)1 << params->cParams.windowLog) + blockSize;
@@ -775,7 +765,7 @@ size_t ZSTD_estimateCStreamSize(int compressionLevel) {
 ZSTD_frameProgression ZSTD_getFrameProgression(const ZSTD_CCtx* cctx)
 {
 #ifdef ZSTD_MULTITHREAD
-    if ((cctx->appliedParams.nbThreads > 1) || (cctx->appliedParams.nonBlockingMode)) {
+    if (cctx->appliedParams.nbWorkers > 0) {
         return ZSTDMT_getFrameProgression(cctx->mtctx);
     }
 #endif
@@ -3166,28 +3156,26 @@ size_t ZSTD_compress_generic (ZSTD_CCtx* cctx,
 
 #ifdef ZSTD_MULTITHREAD
         if ((cctx->pledgedSrcSizePlusOne-1) <= ZSTDMT_JOBSIZE_MIN) {
-            params.nbThreads = 1; /* do not invoke multi-threading when src size is too small */
-            params.nonBlockingMode = 0;
+            params.nbWorkers = 0; /* do not invoke multi-threading when src size is too small */
         }
-        if ((params.nbThreads > 1) | (params.nonBlockingMode == 1)) {
-            if (cctx->mtctx == NULL || (params.nbThreads != ZSTDMT_getNbThreads(cctx->mtctx))) {
-                DEBUGLOG(4, "ZSTD_compress_generic: creating new mtctx for nbThreads=%u",
-                            params.nbThreads);
+        if (params.nbWorkers > 0) {
+            if (cctx->mtctx == NULL || (params.nbWorkers != ZSTDMT_getNbWorkers(cctx->mtctx))) {
+                DEBUGLOG(4, "ZSTD_compress_generic: creating new mtctx for nbWorkers=%u",
+                            params.nbWorkers);
                 if (cctx->mtctx != NULL)
-                    DEBUGLOG(4, "ZSTD_compress_generic: previous nbThreads was %u",
-                                ZSTDMT_getNbThreads(cctx->mtctx));
+                    DEBUGLOG(4, "ZSTD_compress_generic: previous nbWorkers was %u",
+                                ZSTDMT_getNbWorkers(cctx->mtctx));
                 ZSTDMT_freeCCtx(cctx->mtctx);
-                cctx->mtctx = ZSTDMT_createCCtx_advanced(params.nbThreads, cctx->customMem);
+                cctx->mtctx = ZSTDMT_createCCtx_advanced(params.nbWorkers, cctx->customMem);
                 if (cctx->mtctx == NULL) return ERROR(memory_allocation);
             }
-            DEBUGLOG(4, "call ZSTDMT_initCStream_internal as nbThreads=%u", params.nbThreads);
+            DEBUGLOG(4, "call ZSTDMT_initCStream_internal as nbWorkers=%u", params.nbWorkers);
             CHECK_F( ZSTDMT_initCStream_internal(
                         cctx->mtctx,
                         prefixDict.dict, prefixDict.dictSize, ZSTD_dm_rawContent,
                         cctx->cdict, params, cctx->pledgedSrcSizePlusOne-1) );
             cctx->streamStage = zcss_load;
-            cctx->appliedParams.nbThreads = params.nbThreads;
-            cctx->appliedParams.nonBlockingMode = params.nonBlockingMode;
+            cctx->appliedParams.nbWorkers = params.nbWorkers;
         } else
 #endif
         {   CHECK_F( ZSTD_resetCStream_internal(
@@ -3195,12 +3183,12 @@ size_t ZSTD_compress_generic (ZSTD_CCtx* cctx,
                              prefixDict.dictMode, cctx->cdict, params,
                              cctx->pledgedSrcSizePlusOne-1) );
             assert(cctx->streamStage == zcss_load);
-            assert(cctx->appliedParams.nbThreads <= 1);
+            assert(cctx->appliedParams.nbWorkers == 0);
     }   }
 
     /* compression stage */
 #ifdef ZSTD_MULTITHREAD
-    if ((cctx->appliedParams.nbThreads > 1) | (cctx->appliedParams.nonBlockingMode==1)) {
+    if (cctx->appliedParams.nbWorkers > 0) {
         size_t const flushMin = ZSTDMT_compressStream_generic(cctx->mtctx, output, input, endOp);
         if ( ZSTD_isError(flushMin)
           || (endOp == ZSTD_e_end && flushMin == 0) ) { /* compression completed */
index cf5936587deab32defee7a2df366abba7998e4b9..eb651fb5e09a76fdacc6e6e5884a3cc84ef2505e 100644 (file)
@@ -145,12 +145,11 @@ struct ZSTD_CCtx_params_s {
     ZSTD_frameParameters fParams;
 
     int compressionLevel;
-    U32 forceWindow;           /* force back-references to respect limit of
+    int forceWindow;           /* force back-references to respect limit of
                                 * 1<<wLog, even for dictionary */
 
     /* Multithreading: used to pass parameters to mtctx */
-    U32 nbThreads;
-    int nonBlockingMode;      /* will trigger ZSTDMT even with nbThreads==1 */
+    unsigned nbWorkers;
     unsigned jobSize;
     unsigned overlapSizeLog;
 
index 32390a61ce35e855b659913d55a22c66ff9b2a3a..90084b9b3a03f992d2a27e1e2520e9825d7c6e12 100644 (file)
@@ -10,7 +10,7 @@
 
 
 /* ======   Tuning parameters   ====== */
-#define ZSTDMT_NBTHREADS_MAX 200
+#define ZSTDMT_NBWORKERS_MAX 200
 #define ZSTDMT_JOBSIZE_MAX  (MEM_32bits() ? (512 MB) : (2 GB))  /* note : limited by `jobSize` type, which is `unsigned` */
 #define ZSTDMT_OVERLAPLOG_DEFAULT 6
 
@@ -97,9 +97,9 @@ typedef struct ZSTDMT_bufferPool_s {
     buffer_t bTable[1];   /* variable size */
 } ZSTDMT_bufferPool;
 
-static ZSTDMT_bufferPool* ZSTDMT_createBufferPool(unsigned nbThreads, ZSTD_customMem cMem)
+static ZSTDMT_bufferPool* ZSTDMT_createBufferPool(unsigned nbWorkers, ZSTD_customMem cMem)
 {
-    unsigned const maxNbBuffers = 2*nbThreads + 3;
+    unsigned const maxNbBuffers = 2*nbWorkers + 3;
     ZSTDMT_bufferPool* const bufPool = (ZSTDMT_bufferPool*)ZSTD_calloc(
         sizeof(ZSTDMT_bufferPool) + (maxNbBuffers-1) * sizeof(buffer_t), cMem);
     if (bufPool==NULL) return NULL;
@@ -236,23 +236,24 @@ static void ZSTDMT_freeCCtxPool(ZSTDMT_CCtxPool* pool)
 }
 
 /* ZSTDMT_createCCtxPool() :
- * implies nbThreads >= 1 , checked by caller ZSTDMT_createCCtx() */
-static ZSTDMT_CCtxPool* ZSTDMT_createCCtxPool(unsigned nbThreads,
+ * implies nbWorkers >= 1 , checked by caller ZSTDMT_createCCtx() */
+static ZSTDMT_CCtxPool* ZSTDMT_createCCtxPool(unsigned nbWorkers,
                                               ZSTD_customMem cMem)
 {
     ZSTDMT_CCtxPool* const cctxPool = (ZSTDMT_CCtxPool*) ZSTD_calloc(
-        sizeof(ZSTDMT_CCtxPool) + (nbThreads-1)*sizeof(ZSTD_CCtx*), cMem);
+        sizeof(ZSTDMT_CCtxPool) + (nbWorkers-1)*sizeof(ZSTD_CCtx*), cMem);
+    assert(nbWorkers > 0);
     if (!cctxPool) return NULL;
     if (ZSTD_pthread_mutex_init(&cctxPool->poolMutex, NULL)) {
         ZSTD_free(cctxPool, cMem);
         return NULL;
     }
     cctxPool->cMem = cMem;
-    cctxPool->totalCCtx = nbThreads;
+    cctxPool->totalCCtx = nbWorkers;
     cctxPool->availCCtx = 1;   /* at least one cctx for single-thread mode */
     cctxPool->cctx[0] = ZSTD_createCCtx_advanced(cMem);
     if (!cctxPool->cctx[0]) { ZSTDMT_freeCCtxPool(cctxPool); return NULL; }
-    DEBUGLOG(3, "cctxPool created, with %u threads", nbThreads);
+    DEBUGLOG(3, "cctxPool created, with %u workers", nbWorkers);
     return cctxPool;
 }
 
@@ -260,15 +261,16 @@ static ZSTDMT_CCtxPool* ZSTDMT_createCCtxPool(unsigned nbThreads,
 static size_t ZSTDMT_sizeof_CCtxPool(ZSTDMT_CCtxPool* cctxPool)
 {
     ZSTD_pthread_mutex_lock(&cctxPool->poolMutex);
-    {   unsigned const nbThreads = cctxPool->totalCCtx;
+    {   unsigned const nbWorkers = cctxPool->totalCCtx;
         size_t const poolSize = sizeof(*cctxPool)
-                                + (nbThreads-1)*sizeof(ZSTD_CCtx*);
+                                + (nbWorkers-1) * sizeof(ZSTD_CCtx*);
         unsigned u;
         size_t totalCCtxSize = 0;
-        for (u=0; u<nbThreads; u++) {
+        for (u=0; u<nbWorkers; u++) {
             totalCCtxSize += ZSTD_sizeof_CCtx(cctxPool->cctx[u]);
         }
         ZSTD_pthread_mutex_unlock(&cctxPool->poolMutex);
+        assert(nbWorkers > 0);
         return poolSize + totalCCtxSize;
     }
 }
@@ -295,8 +297,8 @@ static void ZSTDMT_releaseCCtx(ZSTDMT_CCtxPool* pool, ZSTD_CCtx* cctx)
     if (pool->availCCtx < pool->totalCCtx)
         pool->cctx[pool->availCCtx++] = cctx;
     else {
-        /* pool overflow : should not happen, since totalCCtx==nbThreads */
-        DEBUGLOG(5, "CCtx pool overflow : free cctx");
+        /* pool overflow : should not happen, since totalCCtx==nbWorkers */
+        DEBUGLOG(4, "CCtx pool overflow : free cctx");
         ZSTD_freeCCtx(cctx);
     }
     ZSTD_pthread_mutex_unlock(&pool->poolMutex);
@@ -502,52 +504,52 @@ static ZSTDMT_jobDescription* ZSTDMT_createJobsTable(U32* nbJobsPtr, ZSTD_custom
     return jobTable;
 }
 
-/* ZSTDMT_CCtxParam_setNbThreads():
+/* ZSTDMT_CCtxParam_setNbWorkers():
  * Internal use only */
-size_t ZSTDMT_CCtxParam_setNbThreads(ZSTD_CCtx_params* params, unsigned nbThreads)
+size_t ZSTDMT_CCtxParam_setNbWorkers(ZSTD_CCtx_params* params, unsigned nbWorkers)
 {
-    if (nbThreads > ZSTDMT_NBTHREADS_MAX) nbThreads = ZSTDMT_NBTHREADS_MAX;
-    if (nbThreads < 1) nbThreads = 1;
-    params->nbThreads = nbThreads;
+    if (nbWorkers > ZSTDMT_NBWORKERS_MAX) nbWorkers = ZSTDMT_NBWORKERS_MAX;
+    if (nbWorkers < 1) nbWorkers = 1;
+    params->nbWorkers = nbWorkers;
     params->overlapSizeLog = ZSTDMT_OVERLAPLOG_DEFAULT;
     params->jobSize = 0;
-    return nbThreads;
+    return nbWorkers;
 }
 
-ZSTDMT_CCtx* ZSTDMT_createCCtx_advanced(unsigned nbThreads, ZSTD_customMem cMem)
+ZSTDMT_CCtx* ZSTDMT_createCCtx_advanced(unsigned nbWorkers, ZSTD_customMem cMem)
 {
     ZSTDMT_CCtx* mtctx;
-    U32 nbJobs = nbThreads + 2;
-    DEBUGLOG(3, "ZSTDMT_createCCtx_advanced (nbThreads = %u)", nbThreads);
+    U32 nbJobs = nbWorkers + 2;
+    DEBUGLOG(3, "ZSTDMT_createCCtx_advanced (nbWorkers = %u)", nbWorkers);
 
-    if (nbThreads < 1) return NULL;
-    nbThreads = MIN(nbThreads , ZSTDMT_NBTHREADS_MAX);
+    if (nbWorkers < 1) return NULL;
+    nbWorkers = MIN(nbWorkers , ZSTDMT_NBWORKERS_MAX);
     if ((cMem.customAlloc!=NULL) ^ (cMem.customFree!=NULL))
         /* invalid custom allocator */
         return NULL;
 
     mtctx = (ZSTDMT_CCtx*) ZSTD_calloc(sizeof(ZSTDMT_CCtx), cMem);
     if (!mtctx) return NULL;
-    ZSTDMT_CCtxParam_setNbThreads(&mtctx->params, nbThreads);
+    ZSTDMT_CCtxParam_setNbWorkers(&mtctx->params, nbWorkers);
     mtctx->cMem = cMem;
     mtctx->allJobsCompleted = 1;
-    mtctx->factory = POOL_create_advanced(nbThreads, 0, cMem);
+    mtctx->factory = POOL_create_advanced(nbWorkers, 0, cMem);
     mtctx->jobs = ZSTDMT_createJobsTable(&nbJobs, cMem);
     assert(nbJobs > 0); assert((nbJobs & (nbJobs - 1)) == 0);  /* ensure nbJobs is a power of 2 */
     mtctx->jobIDMask = nbJobs - 1;
-    mtctx->bufPool = ZSTDMT_createBufferPool(nbThreads, cMem);
-    mtctx->cctxPool = ZSTDMT_createCCtxPool(nbThreads, cMem);
+    mtctx->bufPool = ZSTDMT_createBufferPool(nbWorkers, cMem);
+    mtctx->cctxPool = ZSTDMT_createCCtxPool(nbWorkers, cMem);
     if (!mtctx->factory | !mtctx->jobs | !mtctx->bufPool | !mtctx->cctxPool) {
         ZSTDMT_freeCCtx(mtctx);
         return NULL;
     }
-    DEBUGLOG(3, "mt_cctx created, for %u threads", nbThreads);
+    DEBUGLOG(3, "mt_cctx created, for %u threads", nbWorkers);
     return mtctx;
 }
 
-ZSTDMT_CCtx* ZSTDMT_createCCtx(unsigned nbThreads)
+ZSTDMT_CCtx* ZSTDMT_createCCtx(unsigned nbWorkers)
 {
-    return ZSTDMT_createCCtx_advanced(nbThreads, ZSTD_defaultCMem);
+    return ZSTDMT_createCCtx_advanced(nbWorkers, ZSTD_defaultCMem);
 }
 
 
@@ -649,8 +651,8 @@ size_t ZSTDMT_setMTCtxParameter(ZSTDMT_CCtx* mtctx, ZSTDMT_parameter parameter,
     }
 }
 
-/* Sets parameters relevant to the compression job, initializing others to
- * default values. Notably, nbThreads should probably be zero. */
+/* Sets parameters relevant to the compression job,
+ * initializing others to default values. */
 static ZSTD_CCtx_params ZSTDMT_initJobCCtxParams(ZSTD_CCtx_params const params)
 {
     ZSTD_CCtx_params jobParams;
@@ -664,7 +666,7 @@ static ZSTD_CCtx_params ZSTDMT_initJobCCtxParams(ZSTD_CCtx_params const params)
     return jobParams;
 }
 
-/*! ZSTDMT_MTCtx_setParametersUsingCCtxParams() :
+/*! ZSTDMT_MTCtx_setParametersUsingCCtxParams_whileCompressing() :
  *  Apply a ZSTD_CCtx_params to the compression context.
  *  This entry point is accessed while compression is ongoing,
  *  new parameters will be applied to next compression job.
@@ -675,21 +677,23 @@ static ZSTD_CCtx_params ZSTDMT_initJobCCtxParams(ZSTD_CCtx_params const params)
  *  - job size
  *  - overlap size
  */
-void ZSTDMT_MTCtx_setParametersUsingCCtxParams(ZSTDMT_CCtx* mtctx, const ZSTD_CCtx_params* params)
+void ZSTDMT_MTCtx_setParametersUsingCCtxParams_whileCompressing(ZSTDMT_CCtx* mtctx, const ZSTD_CCtx_params* params)
 {
     U32 const wlog = mtctx->params.cParams.windowLog;
+    U32 const nbWorkers = mtctx->params.nbWorkers;
     mtctx->params = *params;
-    mtctx->params.cParams.windowLog = wlog;  /* Do not modify windowLog ! */
+    mtctx->params.cParams.windowLog = wlog;  /* Do not modify windowLog ! Frame must keep same wlog during the whole process ! */
+    mtctx->params.nbWorkers = nbWorkers;     /* Do not modify nbWorkers, it must remain synchronized with CCtx Pool ! */
     /* note : other parameters not updated are simply not used beyond initialization */
 }
 
-/* ZSTDMT_getNbThreads():
+/* ZSTDMT_getNbWorkers():
  * @return nb threads currently active in mtctx.
  * mtctx must be valid */
-unsigned ZSTDMT_getNbThreads(const ZSTDMT_CCtx* mtctx)
+unsigned ZSTDMT_getNbWorkers(const ZSTDMT_CCtx* mtctx)
 {
     assert(mtctx != NULL);
-    return mtctx->params.nbThreads;
+    return mtctx->params.nbWorkers;
 }
 
 /* ZSTDMT_getFrameProgression():
@@ -728,15 +732,15 @@ ZSTD_frameProgression ZSTDMT_getFrameProgression(ZSTDMT_CCtx* mtctx)
 /* =====   Multi-threaded compression   ===== */
 /* ------------------------------------------ */
 
-static unsigned ZSTDMT_computeNbJobs(size_t srcSize, unsigned windowLog, unsigned nbThreads) {
-    assert(nbThreads>0);
+static unsigned ZSTDMT_computeNbJobs(size_t srcSize, unsigned windowLog, unsigned nbWorkers) {
+    assert(nbWorkers>0);
     {   size_t const jobSizeTarget = (size_t)1 << (windowLog + 2);
         size_t const jobMaxSize = jobSizeTarget << 2;
-        size_t const passSizeMax = jobMaxSize * nbThreads;
+        size_t const passSizeMax = jobMaxSize * nbWorkers;
         unsigned const multiplier = (unsigned)(srcSize / passSizeMax) + 1;
-        unsigned const nbJobsLarge = multiplier * nbThreads;
+        unsigned const nbJobsLarge = multiplier * nbWorkers;
         unsigned const nbJobsMax = (unsigned)(srcSize / jobSizeTarget) + 1;
-        unsigned const nbJobsSmall = MIN(nbJobsMax, nbThreads);
+        unsigned const nbJobsSmall = MIN(nbJobsMax, nbWorkers);
         return (multiplier>1) ? nbJobsLarge : nbJobsSmall;
 }   }
 
@@ -753,7 +757,7 @@ static size_t ZSTDMT_compress_advanced_internal(
     ZSTD_CCtx_params const jobParams = ZSTDMT_initJobCCtxParams(params);
     unsigned const overlapRLog = (params.overlapSizeLog>9) ? 0 : 9-params.overlapSizeLog;
     size_t const overlapSize = (overlapRLog>=9) ? 0 : (size_t)1 << (params.cParams.windowLog - overlapRLog);
-    unsigned const nbJobs = ZSTDMT_computeNbJobs(srcSize, params.cParams.windowLog, params.nbThreads);
+    unsigned const nbJobs = ZSTDMT_computeNbJobs(srcSize, params.cParams.windowLog, params.nbWorkers);
     size_t const proposedJobSize = (srcSize + (nbJobs-1)) / nbJobs;
     size_t const avgJobSize = (((proposedJobSize-1) & 0x1FFFF) < 0x7FFF) ? proposedJobSize + 0xFFFF : proposedJobSize;   /* avoid too small last block */
     const char* const srcStart = (const char*)src;
@@ -761,13 +765,13 @@ static size_t ZSTDMT_compress_advanced_internal(
     unsigned const compressWithinDst = (dstCapacity >= ZSTD_compressBound(srcSize)) ? nbJobs : (unsigned)(dstCapacity / ZSTD_compressBound(avgJobSize));  /* presumes avgJobSize >= 256 KB, which should be the case */
     size_t frameStartPos = 0, dstBufferPos = 0;
     XXH64_state_t xxh64;
-    assert(jobParams.nbThreads == 0);
-    assert(mtctx->cctxPool->totalCCtx == params.nbThreads);
+    assert(jobParams.nbWorkers == 0);
+    assert(mtctx->cctxPool->totalCCtx == params.nbWorkers);
 
     DEBUGLOG(4, "ZSTDMT_compress_advanced_internal: nbJobs=%2u (rawSize=%u bytes; fixedSize=%u) ",
                 nbJobs, (U32)proposedJobSize, (U32)avgJobSize);
 
-    if ((nbJobs==1) | (params.nbThreads<=1)) {   /* fallback to single-thread mode : this is a blocking invocation anyway */
+    if ((nbJobs==1) | (params.nbWorkers<=1)) {   /* fallback to single-thread mode : this is a blocking invocation anyway */
         ZSTD_CCtx* const cctx = mtctx->cctxPool->cctx[0];
         if (cdict) return ZSTD_compress_usingCDict_advanced(cctx, dst, dstCapacity, src, srcSize, cdict, jobParams.fParams);
         return ZSTD_compress_advanced_internal(cctx, dst, dstCapacity, src, srcSize, NULL, 0, jobParams);
@@ -911,11 +915,12 @@ size_t ZSTDMT_initCStream_internal(
         const ZSTD_CDict* cdict, ZSTD_CCtx_params params,
         unsigned long long pledgedSrcSize)
 {
-    DEBUGLOG(4, "ZSTDMT_initCStream_internal (pledgedSrcSize=%u)", (U32)pledgedSrcSize);
+    DEBUGLOG(2, "ZSTDMT_initCStream_internal (pledgedSrcSize=%u, nbWorkers=%u, cctxPool=%u)",
+                (U32)pledgedSrcSize, params.nbWorkers, mtctx->cctxPool->totalCCtx);
     /* params are supposed to be fully validated at this point */
     assert(!ZSTD_isError(ZSTD_checkCParams(params.cParams)));
     assert(!((dict) && (cdict)));  /* either dict or cdict, not both */
-    assert(mtctx->cctxPool->totalCCtx == params.nbThreads);
+    assert(mtctx->cctxPool->totalCCtx == params.nbWorkers);
     mtctx->singleBlockingThread = (pledgedSrcSize <= ZSTDMT_JOBSIZE_MIN);  /* do not trigger multi-threading when srcSize is too small */
     if (params.jobSize == 0) {
         if (params.cParams.windowLog >= 29)
@@ -928,12 +933,12 @@ size_t ZSTDMT_initCStream_internal(
     if (mtctx->singleBlockingThread) {
         ZSTD_CCtx_params const singleThreadParams = ZSTDMT_initJobCCtxParams(params);
         DEBUGLOG(4, "ZSTDMT_initCStream_internal: switch to single blocking thread mode");
-        assert(singleThreadParams.nbThreads == 0);
+        assert(singleThreadParams.nbWorkers == 0);
         return ZSTD_initCStream_internal(mtctx->cctxPool->cctx[0],
                                          dict, dictSize, cdict,
                                          singleThreadParams, pledgedSrcSize);
     }
-    DEBUGLOG(4, "ZSTDMT_initCStream_internal: %u threads", params.nbThreads);
+    DEBUGLOG(4, "ZSTDMT_initCStream_internal: %u workers", params.nbWorkers);
 
     if (mtctx->allJobsCompleted == 0) {   /* previous compression not correctly finished */
         ZSTDMT_waitForAllJobsCompleted(mtctx);
@@ -1012,8 +1017,6 @@ size_t ZSTDMT_initCStream_usingCDict(ZSTDMT_CCtx* mtctx,
 size_t ZSTDMT_resetCStream(ZSTDMT_CCtx* mtctx, unsigned long long pledgedSrcSize)
 {
     if (!pledgedSrcSize) pledgedSrcSize = ZSTD_CONTENTSIZE_UNKNOWN;
-    if (mtctx->params.nbThreads==1)
-        return ZSTD_resetCStream(mtctx->cctxPool->cctx[0], pledgedSrcSize);
     return ZSTDMT_initCStream_internal(mtctx, NULL, 0, ZSTD_dm_auto, 0, mtctx->params,
                                        pledgedSrcSize);
 }
index dcff1138ae21a4785942601be1449034546021c9..3fef2ed6a7d0c5a75b2717a9492f9338b16002c6 100644 (file)
@@ -30,8 +30,8 @@
 
 /* ===   Memory management   === */
 typedef struct ZSTDMT_CCtx_s ZSTDMT_CCtx;
-ZSTDLIB_API ZSTDMT_CCtx* ZSTDMT_createCCtx(unsigned nbThreads);
-ZSTDLIB_API ZSTDMT_CCtx* ZSTDMT_createCCtx_advanced(unsigned nbThreads,
+ZSTDLIB_API ZSTDMT_CCtx* ZSTDMT_createCCtx(unsigned nbWorkers);
+ZSTDLIB_API ZSTDMT_CCtx* ZSTDMT_createCCtx_advanced(unsigned nbWorkers,
                                                     ZSTD_customMem cMem);
 ZSTDLIB_API size_t ZSTDMT_freeCCtx(ZSTDMT_CCtx* mtctx);
 
@@ -116,12 +116,12 @@ ZSTDLIB_API size_t ZSTDMT_compressStream_generic(ZSTDMT_CCtx* mtctx,
 
 size_t ZSTDMT_CCtxParam_setMTCtxParameter(ZSTD_CCtx_params* params, ZSTDMT_parameter parameter, unsigned value);
 
-/* ZSTDMT_CCtxParam_setNbThreads()
- * Set nbThreads, and clamp it.
+/* ZSTDMT_CCtxParam_setNbWorkers()
+ * Set nbWorkers, and clamp it.
  * Also reset jobSize and overlapLog */
-size_t ZSTDMT_CCtxParam_setNbThreads(ZSTD_CCtx_params* params, unsigned nbThreads);
+size_t ZSTDMT_CCtxParam_setNbWorkers(ZSTD_CCtx_params* params, unsigned nbWorkers);
 
-/*! ZSTDMT_MTCtx_setParametersUsingCCtxParams() :
+/*! ZSTDMT_MTCtx_setParametersUsingCCtxParams_whileCompressing() :
  *  Apply a ZSTD_CCtx_params to the compression context.
  *  This works even during compression, and will be applied to next compression job.
  *  However, the following parameters will NOT be updated after compression has been started :
@@ -131,12 +131,12 @@ size_t ZSTDMT_CCtxParam_setNbThreads(ZSTD_CCtx_params* params, unsigned nbThread
  *  - job size
  *  - overlap size
  */
-void ZSTDMT_MTCtx_setParametersUsingCCtxParams(ZSTDMT_CCtx* mtctx, const ZSTD_CCtx_params* params);
+void ZSTDMT_MTCtx_setParametersUsingCCtxParams_whileCompressing(ZSTDMT_CCtx* mtctx, const ZSTD_CCtx_params* params);
 
-/* ZSTDMT_getNbThreads():
+/* ZSTDMT_getNbWorkers():
  * @return nb threads currently active in mtctx.
  * mtctx must be valid */
-unsigned ZSTDMT_getNbThreads(const ZSTDMT_CCtx* mtctx);
+unsigned ZSTDMT_getNbWorkers(const ZSTDMT_CCtx* mtctx);
 
 /* ZSTDMT_getFrameProgression():
  * tells how much data has been consumed (input) and produced (output) for current frame.
index c3630eb0010f7e2d6afb7f1b7690b46cadc9249a..be93a7eeb7e99da241a5b22aa00a1c2d85571c2f 100644 (file)
@@ -506,7 +506,7 @@ ZSTDLIB_API size_t ZSTD_sizeof_DDict(const ZSTD_DDict* ddict);
  *  It will also consider src size to be arbitrarily "large", which is worst case.
  *  If srcSize is known to always be small, ZSTD_estimateCCtxSize_usingCParams() can provide a tighter estimation.
  *  ZSTD_estimateCCtxSize_usingCParams() can be used in tandem with ZSTD_getCParams() to create cParams from compressionLevel.
- *  ZSTD_estimateCCtxSize_usingCCtxParams() can be used in tandem with ZSTD_CCtxParam_setParameter(). Only single-threaded compression is supported. This function will return an error code if ZSTD_p_nbThreads is > 1.
+ *  ZSTD_estimateCCtxSize_usingCCtxParams() can be used in tandem with ZSTD_CCtxParam_setParameter(). Only single-threaded compression is supported. This function will return an error code if ZSTD_p_nbWorkers is >= 1.
  *  Note : CCtx size estimation is only correct for single-threaded compression. */
 ZSTDLIB_API size_t ZSTD_estimateCCtxSize(int compressionLevel);
 ZSTDLIB_API size_t ZSTD_estimateCCtxSize_usingCParams(ZSTD_compressionParameters cParams);
@@ -518,7 +518,7 @@ ZSTDLIB_API size_t ZSTD_estimateDCtxSize(void);
  *  It will also consider src size to be arbitrarily "large", which is worst case.
  *  If srcSize is known to always be small, ZSTD_estimateCStreamSize_usingCParams() can provide a tighter estimation.
  *  ZSTD_estimateCStreamSize_usingCParams() can be used in tandem with ZSTD_getCParams() to create cParams from compressionLevel.
- *  ZSTD_estimateCStreamSize_usingCCtxParams() can be used in tandem with ZSTD_CCtxParam_setParameter(). Only single-threaded compression is supported. This function will return an error code if ZSTD_p_nbThreads is set to a value > 1.
+ *  ZSTD_estimateCStreamSize_usingCCtxParams() can be used in tandem with ZSTD_CCtxParam_setParameter(). Only single-threaded compression is supported. This function will return an error code if ZSTD_p_nbWorkers is >= 1.
  *  Note : CStream size estimation is only correct for single-threaded compression.
  *  ZSTD_DStream memory budget depends on window Size.
  *  This information can be passed manually, using ZSTD_estimateDStreamSize,
@@ -992,18 +992,13 @@ typedef enum {
     /* multi-threading parameters */
     /* These parameters are only useful if multi-threading is enabled (ZSTD_MULTITHREAD).
      * They return an error otherwise. */
-    ZSTD_p_nbThreads=400,    /* Select how many threads a compression job can spawn (default:1)
-                              * More threads improve speed, but also increase memory usage.
-                              * Can only receive a value > 1 if ZSTD_MULTITHREAD is enabled.
-                              * Special: value 0 means "do not change nbThreads" */
-    ZSTD_p_nonBlockingMode,  /* Single thread mode is by default "blocking" :
-                              * it finishes its job as much as possible, and only then gives back control to caller.
-                              * In contrast, multi-thread is by default "non-blocking" :
-                              * it takes some input, flush some output if available, and immediately gives back control to caller.
-                              * Compression work is performed in parallel, within worker threads.
-                              * (note : a strong exception to this rule is when first job is called with ZSTD_e_end : it becomes blocking)
-                              * Setting this parameter to 1 will enforce non-blocking mode even when only 1 thread is selected.
-                              * It allows the caller to do other tasks while the worker thread compresses in parallel. */
+    ZSTD_p_nbWorkers=400,    /* Select how many threads will be spawned to compress in parallel.
+                              * When nbWorkers >= 1, triggers asynchronous mode :
+                              * ZSTD_compress_generic() consumes some input, flush some output if possible, and immediately gives back control to caller,
+                              * while compression work is performed in parallel, within worker threads.
+                              * (note : a strong exception to this rule is when first invocation sets ZSTD_e_end : it becomes a blocking call).
+                              * More workers improve speed, but also increase memory usage.
+                              * Default value is `0`, aka "single-threaded mode" : no worker is spawned, compression is performed inside Caller's thread, all invocations are blocking */
     ZSTD_p_jobSize,          /* Size of a compression job. This value is only enforced in streaming (non-blocking) mode.
                               * Each compression job is completed in parallel, so indirectly controls the nb of active threads.
                               * 0 means default, which is dynamically determined based on compression parameters.
@@ -1231,9 +1226,10 @@ ZSTDLIB_API size_t ZSTD_CCtxParam_setParameter(ZSTD_CCtx_params* params, ZSTD_cP
 
 /*! ZSTD_CCtx_setParametersUsingCCtxParams() :
  *  Apply a set of ZSTD_CCtx_params to the compression context.
- *  This must be done before the dictionary is loaded.
- *  The pledgedSrcSize is treated as unknown.
- *  Multithreading parameters are applied only if nbThreads > 1.
+ *  This can be done even after compression is started,
+ *    if nbWorkers==0, this will have no impact until a new compression is started.
+ *    if nbWorkers>=1, new parameters will be picked up at next job,
+ *       with a few restrictions (windowLog, pledgedSrcSize, nbWorkers, jobSize, and overlapLog are not updated).
  */
 ZSTDLIB_API size_t ZSTD_CCtx_setParametersUsingCCtxParams(
         ZSTD_CCtx* cctx, const ZSTD_CCtx_params* params);
index 843920c89d3c3cbeffe5634d88663d676f694638..bf3dcb47d571bb24de69009464843e4d75255137 100644 (file)
@@ -122,12 +122,12 @@ void BMK_setBlockSize(size_t blockSize)
 
 void BMK_setDecodeOnlyMode(unsigned decodeFlag) { g_decodeOnly = (decodeFlag>0); }
 
-static U32 g_nbThreads = 1;
-void BMK_setNbThreads(unsigned nbThreads) {
+static U32 g_nbWorkers = 0;
+void BMK_setNbWorkers(unsigned nbWorkers) {
 #ifndef ZSTD_MULTITHREAD
-    if (nbThreads > 1) DISPLAYLEVEL(2, "Note : multi-threading is disabled \n");
+    if (nbWorkers > 0) DISPLAYLEVEL(2, "Note : multi-threading is disabled \n");
 #endif
-    g_nbThreads = nbThreads;
+    g_nbWorkers = nbWorkers;
 }
 
 static U32 g_realTime = 0;
@@ -295,7 +295,7 @@ static int BMK_benchMem(const void* srcBuffer, size_t srcSize,
                 if (!cCompleted) {   /* still some time to do compression tests */
                     U64 const clockLoop = g_nbSeconds ? TIMELOOP_MICROSEC : 1;
                     U32 nbLoops = 0;
-                    ZSTD_CCtx_setParameter(ctx, ZSTD_p_nbThreads, g_nbThreads);
+                    ZSTD_CCtx_setParameter(ctx, ZSTD_p_nbWorkers, g_nbWorkers);
                     ZSTD_CCtx_setParameter(ctx, ZSTD_p_compressionLevel, cLevel);
                     ZSTD_CCtx_setParameter(ctx, ZSTD_p_enableLongDistanceMatching, g_ldmFlag);
                     ZSTD_CCtx_setParameter(ctx, ZSTD_p_ldmMinMatch, g_ldmMinMatch);
index d2ec94ee36888ea0662e60380d9d999b9daa9bf5..bf1087013feb1686b76889ca6c185c4cd0add598 100644 (file)
@@ -22,7 +22,7 @@ int BMK_benchFiles(const char** fileNamesTable, unsigned nbFiles, const char* di
 /* Set Parameters */
 void BMK_setNbSeconds(unsigned nbLoops);
 void BMK_setBlockSize(size_t blockSize);
-void BMK_setNbThreads(unsigned nbThreads);
+void BMK_setNbWorkers(unsigned nbWorkers);
 void BMK_setRealTime(unsigned priority);
 void BMK_setNotificationLevel(unsigned level);
 void BMK_setSeparateFiles(unsigned separate);
index 8bfdc20ee0d26e42d68c008e292fd6eaf08b737f..bd880f30a0e648dac2fb1c38fb9b1e8481681697 100644 (file)
@@ -218,23 +218,23 @@ static U32 g_removeSrcFile = 0;
 void FIO_setRemoveSrcFile(unsigned flag) { g_removeSrcFile = (flag>0); }
 static U32 g_memLimit = 0;
 void FIO_setMemLimit(unsigned memLimit) { g_memLimit = memLimit; }
-static U32 g_nbThreads = 1;
-void FIO_setNbThreads(unsigned nbThreads) {
+static U32 g_nbWorkers = 1;
+void FIO_setNbWorkers(unsigned nbWorkers) {
 #ifndef ZSTD_MULTITHREAD
-    if (nbThreads > 1) DISPLAYLEVEL(2, "Note : multi-threading is disabled \n");
+    if (nbWorkers > 0) DISPLAYLEVEL(2, "Note : multi-threading is disabled \n");
 #endif
-    g_nbThreads = nbThreads;
+    g_nbWorkers = nbWorkers;
 }
 static U32 g_blockSize = 0;
 void FIO_setBlockSize(unsigned blockSize) {
-    if (blockSize && g_nbThreads==1)
+    if (blockSize && g_nbWorkers==0)
         DISPLAYLEVEL(2, "Setting block size is useless in single-thread mode \n");
     g_blockSize = blockSize;
 }
 #define FIO_OVERLAP_LOG_NOTSET 9999
 static U32 g_overlapLog = FIO_OVERLAP_LOG_NOTSET;
 void FIO_setOverlapLog(unsigned overlapLog){
-    if (overlapLog && g_nbThreads==1)
+    if (overlapLog && g_nbWorkers==0)
         DISPLAYLEVEL(2, "Setting overlapLog is useless in single-thread mode \n");
     g_overlapLog = overlapLog;
 }
@@ -461,9 +461,8 @@ static cRess_t FIO_createCResources(const char* dictFileName, int cLevel,
         CHECK( ZSTD_CCtx_setParameter(ress.cctx, ZSTD_p_compressionStrategy, (U32)comprParams->strategy) );
         /* multi-threading */
 #ifdef ZSTD_MULTITHREAD
-        DISPLAYLEVEL(5,"set nb threads = %u \n", g_nbThreads);
-        CHECK( ZSTD_CCtx_setParameter(ress.cctx, ZSTD_p_nbThreads, g_nbThreads) );
-        CHECK( ZSTD_CCtx_setParameter(ress.cctx, ZSTD_p_nonBlockingMode, 1) );
+        DISPLAYLEVEL(5,"set nb threads = %u \n", g_nbWorkers);
+        CHECK( ZSTD_CCtx_setParameter(ress.cctx, ZSTD_p_nbWorkers, g_nbWorkers) );
 #endif
         /* dictionary */
         CHECK( ZSTD_CCtx_setPledgedSrcSize(ress.cctx, srcSize) );  /* set the value temporarily for dictionary loading, to adapt compression parameters */
index 9b9c7ea2fadcec37852c78d5300d673c79897f10..69c83f71dce3d375d6fc77bc2d01eef4c0203b53 100644 (file)
@@ -54,7 +54,7 @@ void FIO_setDictIDFlag(unsigned dictIDFlag);
 void FIO_setChecksumFlag(unsigned checksumFlag);
 void FIO_setRemoveSrcFile(unsigned flag);
 void FIO_setMemLimit(unsigned memLimit);
-void FIO_setNbThreads(unsigned nbThreads);
+void FIO_setNbWorkers(unsigned nbWorkers);
 void FIO_setBlockSize(unsigned blockSize);
 void FIO_setOverlapLog(unsigned overlapLog);
 void FIO_setLdmFlag(unsigned ldmFlag);
index c80c36acfa99c437872fa19d8772d1e7f11540ae..015dc5e4c77a1c2626322a3176e20253896cb96a 100644 (file)
@@ -377,7 +377,7 @@ int main(int argCount, const char* argv[])
         nextArgumentsAreFiles=0,
         ultra=0,
         lastCommand = 0,
-        nbThreads = 1,
+        nbWorkers = 1,
         setRealTimePrio = 0,
         separateFiles = 0,
         ldmFlag = 0;
@@ -422,7 +422,7 @@ int main(int argCount, const char* argv[])
     programName = lastNameFromPath(programName);
 
     /* preset behaviors */
-    if (exeNameMatch(programName, ZSTD_ZSTDMT)) nbThreads=0;
+    if (exeNameMatch(programName, ZSTD_ZSTDMT)) nbWorkers=0;
     if (exeNameMatch(programName, ZSTD_UNZSTD)) operation=zom_decompress;
     if (exeNameMatch(programName, ZSTD_CAT)) { operation=zom_decompress; forceStdout=1; FIO_overwriteMode(); outFileName=stdoutmark; g_displayLevel=1; }   /* supports multiple formats */
     if (exeNameMatch(programName, ZSTD_ZCAT)) { operation=zom_decompress; forceStdout=1; FIO_overwriteMode(); outFileName=stdoutmark; g_displayLevel=1; }  /* behave like zcat, also supports multiple formats */
@@ -515,7 +515,7 @@ int main(int argCount, const char* argv[])
                       continue;
                     }
 #endif
-                    if (longCommandWArg(&argument, "--threads=")) { nbThreads = readU32FromChar(&argument); continue; }
+                    if (longCommandWArg(&argument, "--threads=")) { nbWorkers = readU32FromChar(&argument); continue; }
                     if (longCommandWArg(&argument, "--memlimit=")) { memLimit = readU32FromChar(&argument); continue; }
                     if (longCommandWArg(&argument, "--memory=")) { memLimit = readU32FromChar(&argument); continue; }
                     if (longCommandWArg(&argument, "--memlimit-decompress=")) { memLimit = readU32FromChar(&argument); continue; }
@@ -648,7 +648,7 @@ int main(int argCount, const char* argv[])
                         /* nb of threads (hidden option) */
                     case 'T':
                         argument++;
-                        nbThreads = readU32FromChar(&argument);
+                        nbWorkers = readU32FromChar(&argument);
                         break;
 
                         /* Dictionary Selection level */
@@ -716,10 +716,10 @@ int main(int argCount, const char* argv[])
     /* Welcome message (if verbose) */
     DISPLAYLEVEL(3, WELCOME_MESSAGE);
 
-    if (nbThreads == 0) {
-        /* try to guess */
-        nbThreads = UTIL_countPhysicalCores();
-        DISPLAYLEVEL(3, "Note: %d physical core(s) detected \n", nbThreads);
+    if (nbWorkers == 0) {
+        /* automatically set # workers based on # of reported cpus */
+        nbWorkers = UTIL_countPhysicalCores();
+        DISPLAYLEVEL(3, "Note: %d physical core(s) detected \n", nbWorkers);
     }
 
     g_utilDisplayLevel = g_displayLevel;
@@ -763,7 +763,7 @@ int main(int argCount, const char* argv[])
         BMK_setNotificationLevel(g_displayLevel);
         BMK_setSeparateFiles(separateFiles);
         BMK_setBlockSize(blockSize);
-        BMK_setNbThreads(nbThreads);
+        BMK_setNbWorkers(nbWorkers);
         BMK_setRealTime(setRealTimePrio);
         BMK_setNbSeconds(bench_nbSeconds);
         BMK_setLdmFlag(ldmFlag);
@@ -791,7 +791,7 @@ int main(int argCount, const char* argv[])
         zParams.dictID = dictID;
         if (cover) {
             int const optimize = !coverParams.k || !coverParams.d;
-            coverParams.nbThreads = nbThreads;
+            coverParams.nbThreads = nbWorkers;
             coverParams.zParams = zParams;
             operationResult = DiB_trainFromFiles(outFileName, maxDictSize, filenameTable, filenameIdx, blockSize, NULL, &coverParams, optimize);
         } else {
@@ -835,7 +835,7 @@ int main(int argCount, const char* argv[])
     FIO_setNotificationLevel(g_displayLevel);
     if (operation==zom_compress) {
 #ifndef ZSTD_NOCOMPRESS
-        FIO_setNbThreads(nbThreads);
+        FIO_setNbWorkers(nbWorkers);
         FIO_setBlockSize((U32)blockSize);
         FIO_setLdmFlag(ldmFlag);
         FIO_setLdmHashLog(g_ldmHashLog);
index 2ec3ce5554e2025a295503c303bf0e5c991311da..199f4966b614e38922926a871e96c3527f3f1643 100644 (file)
@@ -181,7 +181,7 @@ static size_t local_ZSTD_compress_generic_T2_end(void* dst, size_t dstCapacity,
     ZSTD_inBuffer buffIn;
     (void)buff2;
     ZSTD_CCtx_setParameter(g_cstream, ZSTD_p_compressionLevel, 1);
-    ZSTD_CCtx_setParameter(g_cstream, ZSTD_p_nbThreads, 2);
+    ZSTD_CCtx_setParameter(g_cstream, ZSTD_p_nbWorkers, 2);
     buffOut.dst = dst;
     buffOut.size = dstCapacity;
     buffOut.pos = 0;
@@ -198,7 +198,7 @@ static size_t local_ZSTD_compress_generic_T2_continue(void* dst, size_t dstCapac
     ZSTD_inBuffer buffIn;
     (void)buff2;
     ZSTD_CCtx_setParameter(g_cstream, ZSTD_p_compressionLevel, 1);
-    ZSTD_CCtx_setParameter(g_cstream, ZSTD_p_nbThreads, 2);
+    ZSTD_CCtx_setParameter(g_cstream, ZSTD_p_nbWorkers, 2);
     buffOut.dst = dst;
     buffOut.size = dstCapacity;
     buffOut.pos = 0;
index 3982f8bbc2bab8418995cd841dab10bf0c581d44..f42b8b7a7a20acb75f86cbb8199f918c0a04309f 100644 (file)
@@ -226,7 +226,7 @@ static int FUZ_mallocTests(unsigned seed, double compressibility, unsigned part)
                 ZSTD_outBuffer out = { outBuffer, outSize, 0 };
                 ZSTD_inBuffer in = { inBuffer, inSize, 0 };
                 CHECK_Z( ZSTD_CCtx_setParameter(cctx, ZSTD_p_compressionLevel, (U32)compressionLevel) );
-                CHECK_Z( ZSTD_CCtx_setParameter(cctx, ZSTD_p_nbThreads, nbThreads) );
+                CHECK_Z( ZSTD_CCtx_setParameter(cctx, ZSTD_p_nbWorkers, nbThreads) );
                 while ( ZSTD_compress_generic(cctx, &out, &in, ZSTD_e_end) ) {}
                 ZSTD_freeCCtx(cctx);
                 DISPLAYLEVEL(3, "compress_generic,-T%u,end level %i : ",
@@ -246,7 +246,7 @@ static int FUZ_mallocTests(unsigned seed, double compressibility, unsigned part)
                 ZSTD_outBuffer out = { outBuffer, outSize, 0 };
                 ZSTD_inBuffer in = { inBuffer, inSize, 0 };
                 CHECK_Z( ZSTD_CCtx_setParameter(cctx, ZSTD_p_compressionLevel, (U32)compressionLevel) );
-                CHECK_Z( ZSTD_CCtx_setParameter(cctx, ZSTD_p_nbThreads, nbThreads) );
+                CHECK_Z( ZSTD_CCtx_setParameter(cctx, ZSTD_p_nbWorkers, nbThreads) );
                 CHECK_Z( ZSTD_compress_generic(cctx, &out, &in, ZSTD_e_continue) );
                 while ( ZSTD_compress_generic(cctx, &out, &in, ZSTD_e_end) ) {}
                 ZSTD_freeCCtx(cctx);
index 180fa9b6f6dcdc1459ea2605016414ca089ef93e..7d937fceebc01433723e9b20c780bc8c4ed3e288 100644 (file)
@@ -94,7 +94,7 @@ static size_t cctxParamRoundTripTest(void* resultBuff, size_t resultBuffCapacity
 
     /* Set parameters */
     CHECK_Z( ZSTD_CCtxParam_setParameter(cctxParams, ZSTD_p_compressionLevel, cLevel) );
-    CHECK_Z( ZSTD_CCtxParam_setParameter(cctxParams, ZSTD_p_nbThreads, 2) );
+    CHECK_Z( ZSTD_CCtxParam_setParameter(cctxParams, ZSTD_p_nbWorkers, 2) );
     CHECK_Z( ZSTD_CCtxParam_setParameter(cctxParams, ZSTD_p_overlapSizeLog, 5) );
 
 
index a4fb5589ef0b4658f3d4e38364babcdae7a3d9d6..db87e2c5b540cf271d06a14568683152cbb717e1 100644 (file)
@@ -753,9 +753,9 @@ static int basicUnitTests(U32 seed, double compressibility)
     DISPLAYLEVEL(3, "OK \n");
 
     /* Complex multithreading + dictionary test */
-    {   U32 const nbThreads = 2;
+    {   U32 const nbWorkers = 2;
         size_t const jobSize = 4 * 1 MB;
-        size_t const srcSize = jobSize * nbThreads;  /* we want each job to have predictable size */
+        size_t const srcSize = jobSize * nbWorkers;  /* we want each job to have predictable size */
         size_t const segLength = 2 KB;
         size_t const offset = 600 KB;   /* must be larger than window defined in cdict */
         size_t const start = jobSize + (offset-1);
@@ -763,7 +763,7 @@ static int basicUnitTests(U32 seed, double compressibility)
         BYTE* const dst = (BYTE*)CNBuffer + start - offset;
         DISPLAYLEVEL(3, "test%3i : compress %u bytes with multiple threads + dictionary : ", testNb++, (U32)srcSize);
         CHECK_Z( ZSTD_CCtx_setParameter(zc, ZSTD_p_compressionLevel, 3) );
-        CHECK_Z( ZSTD_CCtx_setParameter(zc, ZSTD_p_nbThreads, 2) );
+        CHECK_Z( ZSTD_CCtx_setParameter(zc, ZSTD_p_nbWorkers, nbWorkers) );
         CHECK_Z( ZSTD_CCtx_setParameter(zc, ZSTD_p_jobSize, jobSize) );
         assert(start > offset);
         assert(start + segLength < COMPRESSIBLE_NOISE_LENGTH);
@@ -1672,7 +1672,7 @@ static int fuzzerTests_newAPI(U32 seed, U32 nbTests, unsigned startTest, double
                     U32 const nbThreadsAdjusted = (windowLogMalus < nbThreadsCandidate) ? nbThreadsCandidate - windowLogMalus : 1;
                     U32 const nbThreads = MIN(nbThreadsAdjusted, nbThreadsMax);
                     DISPLAYLEVEL(5, "t%u: nbThreads : %u \n", testNb, nbThreads);
-                    CHECK_Z( setCCtxParameter(zc, cctxParams, ZSTD_p_nbThreads, nbThreads, useOpaqueAPI) );
+                    CHECK_Z( setCCtxParameter(zc, cctxParams, ZSTD_p_nbWorkers, nbThreads, useOpaqueAPI) );
                     if (nbThreads > 1) {
                         U32 const jobLog = FUZ_rand(&lseed) % (testLog+1);
                         CHECK_Z( setCCtxParameter(zc, cctxParams, ZSTD_p_overlapSizeLog, FUZ_rand(&lseed) % 10, useOpaqueAPI) );