]> git.ipfire.org Git - thirdparty/zstd.git/commitdiff
introduced parameter ZSTD_p_nonBlockingMode
authorYann Collet <cyan@fb.com>
Wed, 17 Jan 2018 00:15:47 +0000 (16:15 -0800)
committerYann Collet <cyan@fb.com>
Wed, 17 Jan 2018 00:15:47 +0000 (16:15 -0800)
This new parameter makes it possible to call
streaming ZSTDMT with a single thread set
which is non blocking.

It makes it possible for the main thread to do other tasks in parallel
while the worker thread does compression.
Typically, for zstd cli, it means it can do I/O stuff.

Applied within fileio.c, this patch provides non-negligible gains during compression.

Tested on my laptop, with enwik9 (1000000000 bytes) : time zstd -f enwik9

With traditional single-thread blocking mode :
real    0m9.557s
user    0m8.861s
sys     0m0.538s

With new single-worker non blocking mode :
real    0m7.938s
user    0m8.049s
sys     0m0.514s

=> 20% faster

lib/compress/zstd_compress.c
lib/compress/zstd_compress_internal.h
lib/compress/zstdmt_compress.c
lib/compress/zstdmt_compress.h
lib/zstd.h
programs/fileio.c

index d2703f2316f1c779ab59255b105ae55e7495c11c..fbae3c3f22d68d83e9ae205d887bb53970cf7b91 100644 (file)
@@ -281,9 +281,8 @@ size_t ZSTD_CCtx_setParameter(ZSTD_CCtx* cctx, ZSTD_cParameter param, unsigned v
         }
         return ZSTD_CCtxParam_setParameter(&cctx->requestedParams, param, value);
 
+    case ZSTD_p_nonBlockingMode:
     case ZSTD_p_jobSize:
-        return ZSTD_CCtxParam_setParameter(&cctx->requestedParams, param, value);
-
     case ZSTD_p_overlapSizeLog:
         return ZSTD_CCtxParam_setParameter(&cctx->requestedParams, param, value);
 
@@ -407,11 +406,18 @@ size_t ZSTD_CCtxParam_setParameter(
         return ZSTDMT_CCtxParam_setNbThreads(CCtxParams, value);
 #endif
 
+    case ZSTD_p_nonBlockingMode :
+#ifndef ZSTD_MULTITHREAD
+        return ERROR(parameter_unsupported);
+#else
+        CCtxParams->nonBlockingMode = (value>0);
+        return CCtxParams->nonBlockingMode;
+#endif
+
     case ZSTD_p_jobSize :
 #ifndef ZSTD_MULTITHREAD
         return ERROR(parameter_unsupported);
 #else
-        if (CCtxParams->nbThreads <= 1) return ERROR(parameter_unsupported);
         return ZSTDMT_CCtxParam_setMTCtxParameter(CCtxParams, ZSTDMT_p_jobSize, value);
 #endif
 
@@ -419,7 +425,6 @@ size_t ZSTD_CCtxParam_setParameter(
 #ifndef ZSTD_MULTITHREAD
         return ERROR(parameter_unsupported);
 #else
-        if (CCtxParams->nbThreads <= 1) return ERROR(parameter_unsupported);
         return ZSTDMT_CCtxParam_setMTCtxParameter(CCtxParams, ZSTDMT_p_overlapSectionLog, value);
 #endif
 
@@ -3007,12 +3012,17 @@ size_t ZSTD_compress_generic (ZSTD_CCtx* cctx,
                 cctx->requestedParams, cctx->pledgedSrcSizePlusOne-1, 0 /*dictSize*/);
 
 #ifdef ZSTD_MULTITHREAD
-        if ((cctx->pledgedSrcSizePlusOne-1) <= ZSTDMT_JOBSIZE_MIN)
+        if ((cctx->pledgedSrcSizePlusOne-1) <= ZSTDMT_JOBSIZE_MIN) {
             params.nbThreads = 1; /* do not invoke multi-threading when src size is too small */
-        if (params.nbThreads > 1) {
+            params.nonBlockingMode = 0;
+        }
+        if ((params.nbThreads > 1) | (params.nonBlockingMode == 1)) {
             if (cctx->mtctx == NULL || (params.nbThreads != ZSTDMT_getNbThreads(cctx->mtctx))) {
-                DEBUGLOG(4, "ZSTD_compress_generic: creating new mtctx for nbThreads=%u (previous: %u)",
-                            params.nbThreads, (unsigned)ZSTDMT_getNbThreads(cctx->mtctx));
+                DEBUGLOG(4, "ZSTD_compress_generic: creating new mtctx for nbThreads=%u",
+                            params.nbThreads);
+                if (cctx->mtctx != NULL)
+                    DEBUGLOG(4, "ZSTD_compress_generic: previous nbThreads was %u",
+                                ZSTDMT_getNbThreads(cctx->mtctx));
                 ZSTDMT_freeCCtx(cctx->mtctx);
                 cctx->mtctx = ZSTDMT_createCCtx_advanced(params.nbThreads, cctx->customMem);
                 if (cctx->mtctx == NULL) return ERROR(memory_allocation);
@@ -3024,6 +3034,7 @@ size_t ZSTD_compress_generic (ZSTD_CCtx* cctx,
                         cctx->cdict, params, cctx->pledgedSrcSizePlusOne-1) );
             cctx->streamStage = zcss_load;
             cctx->appliedParams.nbThreads = params.nbThreads;
+            cctx->appliedParams.nonBlockingMode = params.nonBlockingMode;
         } else
 #endif
         {   CHECK_F( ZSTD_resetCStream_internal(
@@ -3036,7 +3047,7 @@ size_t ZSTD_compress_generic (ZSTD_CCtx* cctx,
 
     /* compression stage */
 #ifdef ZSTD_MULTITHREAD
-    if (cctx->appliedParams.nbThreads > 1) {
+    if ((cctx->appliedParams.nbThreads > 1) | (cctx->appliedParams.nonBlockingMode==1)) {
         size_t const flushMin = ZSTDMT_compressStream_generic(cctx->mtctx, output, input, endOp);
         if ( ZSTD_isError(flushMin)
           || (endOp == ZSTD_e_end && flushMin == 0) ) { /* compression completed */
index 7be20d4c5748851c75ab6a8e35f532a8649320d6..eb18cbc5e4dd5c82247472163c15aed7a029b1dc 100644 (file)
@@ -150,6 +150,7 @@ struct ZSTD_CCtx_params_s {
 
     /* Multithreading: used to pass parameters to mtctx */
     U32 nbThreads;
+    int nonBlockingMode;      /* will trigger ZSTDMT even with nbThreads==1 */
     unsigned jobSize;
     unsigned overlapSizeLog;
 
index 0dadda284ae3128f1ab58ff2f1bd71e925da55df..0c28eb7871166f7c09f185c7702d4417d25477e1 100644 (file)
@@ -497,7 +497,7 @@ size_t ZSTDMT_CCtxParam_setNbThreads(ZSTD_CCtx_params* params, unsigned nbThread
 /* ZSTDMT_getNbThreads():
  * @return nb threads currently active in mtctx.
  * mtctx must be valid */
-size_t ZSTDMT_getNbThreads(const ZSTDMT_CCtx* mtctx)
+unsigned ZSTDMT_getNbThreads(const ZSTDMT_CCtx* mtctx)
 {
     assert(mtctx != NULL);
     return mtctx->params.nbThreads;
index b6e68684e0268e40436d71be2978df2aaa3b24b1..7716ea68e438b818eac76ba8bf52a534d0ee72de 100644 (file)
@@ -120,7 +120,7 @@ size_t ZSTDMT_CCtxParam_setNbThreads(ZSTD_CCtx_params* params, unsigned nbThread
 /* ZSTDMT_getNbThreads():
  * @return nb threads currently active in mtctx.
  * mtctx must be valid */
-size_t ZSTDMT_getNbThreads(const ZSTDMT_CCtx* mtctx);
+unsigned ZSTDMT_getNbThreads(const ZSTDMT_CCtx* mtctx);
 
 /*! ZSTDMT_initCStream_internal() :
  *  Private use only. Init streaming operation.
index a84490d995af79c268ead88d2589f107c2172da9..6ac132a6671bedd3034e368f0b74c23bfecaee3e 100644 (file)
@@ -972,10 +972,20 @@ typedef enum {
     ZSTD_p_dictIDFlag,       /* When applicable, dictionary's ID is written into frame header (default:1) */
 
     /* multi-threading parameters */
+    /* These parameters are only useful if multi-threading is enabled (ZSTD_MULTITHREAD).
+     * They return an error otherwise. */
     ZSTD_p_nbThreads=400,    /* Select how many threads a compression job can spawn (default:1)
                               * More threads improve speed, but also increase memory usage.
                               * Can only receive a value > 1 if ZSTD_MULTITHREAD is enabled.
                               * Special: value 0 means "do not change nbThreads" */
+    ZSTD_p_nonBlockingMode,  /* Single thread mode is by default "blocking" :
+                              * it finishes its job as much as possible, and only then gives back control to caller.
+                              * In contrast, multi-thread is by default "non-blocking" :
+                              * it takes some input, flush some output if available, and immediately gives back control to caller.
+                              * Compression work is performed in parallel, within worker threads.
+                              * (note : a strong exception to this rule is when first job is called with ZSTD_e_end : it becomes blocking)
+                              * Setting this parameter to 1 will enforce non-blocking mode even when only 1 thread is selected.
+                              * It allows the caller to do other tasks while the worker thread compresses in parallel. */
     ZSTD_p_jobSize,          /* Size of a compression job. This value is only enforced in streaming (non-blocking) mode.
                               * Each compression job is completed in parallel, so indirectly controls the nb of active threads.
                               * 0 means default, which is dynamically determined based on compression parameters.
index 3ae2d40575fb6fe778dd127df7aab546eab91903..ea84853c7de5320d50f4544c9a577ce9cc4d6c90 100644 (file)
@@ -457,6 +457,7 @@ static cRess_t FIO_createCResources(const char* dictFileName, int cLevel,
         /* multi-threading */
         DISPLAYLEVEL(5,"set nb threads = %u \n", g_nbThreads);
         CHECK( ZSTD_CCtx_setParameter(ress.cctx, ZSTD_p_nbThreads, g_nbThreads) );
+        CHECK( ZSTD_CCtx_setParameter(ress.cctx, ZSTD_p_nonBlockingMode, 1) );
         /* dictionary */
         CHECK( ZSTD_CCtx_setPledgedSrcSize(ress.cctx, srcSize) );  /* just for dictionary loading, for compression parameters adaptation */
         CHECK( ZSTD_CCtx_loadDictionary(ress.cctx, dictBuffer, dictBuffSize) );