]> git.ipfire.org Git - thirdparty/zstd.git/commitdiff
zstdmt cli and API allow selection of section sizes
authorYann Collet <cyan@fb.com>
Wed, 25 Jan 2017 01:02:26 +0000 (17:02 -0800)
committerYann Collet <cyan@fb.com>
Wed, 25 Jan 2017 01:08:53 +0000 (17:08 -0800)
By default, section sizes are 4x window size.
This new setting allow manual selection of section sizes.
The larger they are, the (slightly) better the compression ratio,
but also the higher the memory allocation cost,
and eventually the lesser the nb of possible threads,
since each section is compressed by a single thread.

It also introduces a prototype to set generic parameters,
ZSTDMT_setMTCtxParameter()

The idea is that it's possible to add enums
to extend the list of parameters that can be set this way.
This is more long-term oriented than a fixed-size struct.
Consider it as a test.

lib/compress/zstdmt_compress.c
lib/compress/zstdmt_compress.h
programs/fileio.c
programs/fileio.h
programs/zstdcli.c

index 0b91ad4ea82eea4b3191dbbf0141b00b29a92807..1baccf0fca6e0e6a7d5106d736c68a03c6244610 100644 (file)
@@ -9,10 +9,6 @@
 
 
 /* ======   Tuning parameters   ====== */
-#ifndef ZSTDMT_SECTION_LOGSIZE_MIN
-#  define ZSTDMT_SECTION_LOGSIZE_MIN 20   /* minimum size for a full compression job (20==2^20==1 MB) */
-#endif
-
 #define ZSTDMT_NBTHREADS_MAX 128
 
 
@@ -285,6 +281,7 @@ struct ZSTDMT_CCtx_s {
     unsigned frameEnded;
     unsigned allJobsCompleted;
     unsigned long long frameContentSize;
+    size_t sectionSize;
     ZSTD_CDict* cdict;
     ZSTD_CStream* cstream;
     ZSTDMT_jobDescription jobs[1];   /* variable size (must lies at the end) */
@@ -304,6 +301,7 @@ ZSTDMT_CCtx *ZSTDMT_createCCtx(unsigned nbThreads)
     cctx->nbThreads = nbThreads;
     cctx->jobIDMask = nbJobs - 1;
     cctx->allJobsCompleted = 1;
+    cctx->sectionSize = 0;
     cctx->factory = POOL_create(nbThreads, 1);
     cctx->buffPool = ZSTDMT_createBufferPool(nbThreads);
     cctx->cctxPool = ZSTDMT_createCCtxPool(nbThreads);
@@ -356,6 +354,22 @@ size_t ZSTDMT_freeCCtx(ZSTDMT_CCtx* mtctx)
     return 0;
 }
 
+unsigned ZSTDMT_setMTCtxParameter(ZSTDMT_CCtx* mtctx, ZSDTMT_parameter parameter, unsigned value)
+{
+    switch(parameter)
+    {
+    case ZSTDMT_p_sectionSize :
+        mtctx->sectionSize = value;
+        return 0;
+    default :
+        return ERROR(compressionParameter_unsupported);
+    }
+}
+
+
+/* ------------------------------------------ */
+/* =====   Multi-threaded compression   ===== */
+/* ------------------------------------------ */
 
 size_t ZSTDMT_compressCCtx(ZSTDMT_CCtx* mtctx,
                            void* dst, size_t dstCapacity,
@@ -487,7 +501,8 @@ static size_t ZSTDMT_initCStream_internal(ZSTDMT_CCtx* zcs,
             if (zcs->cdict == NULL) return ERROR(memory_allocation);
     }   }
     zcs->frameContentSize = pledgedSrcSize;
-    zcs->targetSectionSize = (size_t)1 << MAX(ZSTDMT_SECTION_LOGSIZE_MIN, (zcs->params.cParams.windowLog + 2));
+    zcs->targetSectionSize = zcs->sectionSize ? zcs->sectionSize : (size_t)1 << (zcs->params.cParams.windowLog + 2);
+    zcs->targetSectionSize = MAX(ZSTDMT_SECTION_SIZE_MIN, zcs->targetSectionSize);
     zcs->inBuffSize = zcs->targetSectionSize + ((size_t)1 << zcs->params.cParams.windowLog);
     zcs->inBuff.buffer = ZSTDMT_getBuffer(zcs->buffPool, zcs->inBuffSize);
     if (zcs->inBuff.buffer.start == NULL) return ERROR(memory_allocation);
index 84d25f7386d40b9025850bffbbd0411773bf1aca..c00782e9d5a2130a65ce4f6f9fa65b17c91899b8 100644 (file)
@@ -7,6 +7,10 @@
  * of patent rights can be found in the PATENTS file in the same directory.
  */
 
+
+/* Note : All prototypes defined in this file shall be considered experimental.
+ *        There is no guarantee of API continuity (yet) on any of these prototypes */
+
 /* ===   Dependencies   === */
 #include <stddef.h>   /* size_t */
 #define ZSTD_STATIC_LINKING_ONLY   /* ZSTD_parameters */
@@ -27,12 +31,32 @@ ZSTDLIB_API size_t ZSTDMT_compressCCtx(ZSTDMT_CCtx* cctx,
 
 /* ===   Streaming functions   === */
 
-ZSTDLIB_API size_t ZSTDMT_initCStream(ZSTDMT_CCtx* zcs, int compressionLevel);
-ZSTDLIB_API size_t ZSTDMT_resetCStream(ZSTDMT_CCtx* zcs, unsigned long long pledgedSrcSize);    /**< pledgedSrcSize is optional and can be zero == unknown */
-ZSTDLIB_API size_t ZSTDMT_initCStream_advanced(ZSTDMT_CCtx* zcs, const void* dict, size_t dictSize,   /**< dict can be released after init, a local copy is preserved within zcs */
-                                   ZSTD_parameters params, unsigned long long pledgedSrcSize);  /**< pledgedSrcSize is optional and can be zero == unknown */
+ZSTDLIB_API size_t ZSTDMT_initCStream(ZSTDMT_CCtx* mtctx, int compressionLevel);
+ZSTDLIB_API size_t ZSTDMT_resetCStream(ZSTDMT_CCtx* mtctx, unsigned long long pledgedSrcSize);    /**< pledgedSrcSize is optional and can be zero == unknown */
+
+ZSTDLIB_API size_t ZSTDMT_compressStream(ZSTDMT_CCtx* mtctx, ZSTD_outBuffer* output, ZSTD_inBuffer* input);
+
+ZSTDLIB_API size_t ZSTDMT_flushStream(ZSTDMT_CCtx* mtctx, ZSTD_outBuffer* output);   /**< @return : 0 == all flushed; >0 : still some data to be flushed; or an error code (ZSTD_isError()) */
+ZSTDLIB_API size_t ZSTDMT_endStream(ZSTDMT_CCtx* mtctx, ZSTD_outBuffer* output);     /**< @return : 0 == all flushed; >0 : still some data to be flushed; or an error code (ZSTD_isError()) */
+
+
+/* ===   Advanced functions and parameters  === */
+
+#ifndef ZSTDMT_SECTION_SIZE_MIN
+#  define ZSTDMT_SECTION_SIZE_MIN (1U << 20)   /* 1 MB - Minimum size of each compression job */
+#endif
+
+ZSTDLIB_API size_t ZSTDMT_initCStream_advanced(ZSTDMT_CCtx* mtctx, const void* dict, size_t dictSize,  /**< dict can be released after init, a local copy is preserved within zcs */
+                                          ZSTD_parameters params, unsigned long long pledgedSrcSize);  /**< pledgedSrcSize is optional and can be zero == unknown */
 
-ZSTDLIB_API size_t ZSTDMT_compressStream(ZSTDMT_CCtx* zcs, ZSTD_outBuffer* output, ZSTD_inBuffer* input);
+/* ZSDTMT_parameter :
+ * List of parameters that can be set using ZSTDMT_setMTCtxParameter() */
+typedef enum { ZSTDMT_p_sectionSize    /* size of input "section". Each section is compressed in parallel. 0 means default, which is dynamically determined within compression functions */
+    } ZSDTMT_parameter;
 
-ZSTDLIB_API size_t ZSTDMT_flushStream(ZSTDMT_CCtx* zcs, ZSTD_outBuffer* output);   /**< @return : 0 == all flushed; >0 : still some data to be flushed; or an error code (ZSTD_isError()) */
-ZSTDLIB_API size_t ZSTDMT_endStream(ZSTDMT_CCtx* zcs, ZSTD_outBuffer* output);     /**< @return : 0 == all flushed; >0 : still some data to be flushed; or an error code (ZSTD_isError()) */
+/* ZSTDMT_setMTCtxParameter() :
+ * allow setting individual parameters, one at a time, among a list of enums defined in ZSTDMT_parameter.
+ * The function must be called typically after ZSTD_createCCtx().
+ * Parameters not explicitly reset by ZSTDMT_init*() remain the same in consecutive compression sessions.
+ * @return : 0, or an error code (which can be tested using ZSTD_isError()) */
+ZSTDLIB_API unsigned ZSTDMT_setMTCtxParameter(ZSTDMT_CCtx* mtctx, ZSDTMT_parameter parameter, unsigned value);
index 3864a5fabfd22962a5010d1e1e288b927291f70e..86db12acb030070f4de8b0b115019ec97d14b13d 100644 (file)
@@ -113,6 +113,16 @@ void FIO_setNbThreads(unsigned nbThreads) {
 #endif
     g_nbThreads = nbThreads;
 }
+static U32 g_blockSize = 0;
+void FIO_setBlockSize(unsigned blockSize) {
+    if (blockSize && g_nbThreads==1)
+        DISPLAYLEVEL(2, "Setting block size is useless in single-thread mode \n");
+#ifdef ZSTD_MULTITHREAD
+    if (blockSize-1 < ZSTDMT_SECTION_SIZE_MIN-1)   /* intentional underflow */
+        DISPLAYLEVEL(2, "Note : minimum block size is %u KB \n", (ZSTDMT_SECTION_SIZE_MIN>>10));
+#endif
+    g_blockSize = blockSize;
+}
 
 
 /*-*************************************
@@ -283,10 +293,12 @@ static cRess_t FIO_createCResources(const char* dictFileName, int cLevel,
             if (comprParams->strategy) params.cParams.strategy = (ZSTD_strategy)(comprParams->strategy - 1);
 #ifdef ZSTD_MULTITHREAD
             {   size_t const errorCode = ZSTDMT_initCStream_advanced(ress.cctx, dictBuffer, dictBuffSize, params, srcSize);
+                if (ZSTD_isError(errorCode)) EXM_THROW(33, "Error initializing CStream : %s", ZSTD_getErrorName(errorCode));
+                ZSTDMT_setMTCtxParameter(ress.cctx, ZSTDMT_p_sectionSize, g_blockSize);
 #else
             {   size_t const errorCode = ZSTD_initCStream_advanced(ress.cctx, dictBuffer, dictBuffSize, params, srcSize);
-#endif
                 if (ZSTD_isError(errorCode)) EXM_THROW(33, "Error initializing CStream : %s", ZSTD_getErrorName(errorCode));
+#endif
         }   }
         free(dictBuffer);
     }
index 9ef4492945d141b32a66cdba7c535bec01f1d7e4..19f09c33a3239618d3f581c83a420a41d4f6c6d0 100644 (file)
@@ -41,6 +41,7 @@ void FIO_setChecksumFlag(unsigned checksumFlag);
 void FIO_setRemoveSrcFile(unsigned flag);
 void FIO_setMemLimit(unsigned memLimit);
 void FIO_setNbThreads(unsigned nbThreads);
+void FIO_setBlockSize(unsigned blockSize);
 
 
 /*-*************************************
index 785ecedeecc48e1a5273405f12b3f4e171692581..549dad01a2c97a2e8fd3837424005d7503706098 100644 (file)
@@ -118,6 +118,7 @@ static int usage_advanced(const char* programName)
     DISPLAY( "--[no-]check : integrity check (default:enabled) \n");
 #ifdef ZSTD_MULTITHREAD
     DISPLAY( " -T#    : use # threads for compression (default:1) \n");
+    DISPLAY( " -B#    : select size of independent sections (default:0==automatic) \n");
 #endif
 #endif
 #ifndef ZSTD_NODECOMPRESS
@@ -625,6 +626,7 @@ int main(int argCount, const char* argv[])
     if (operation==zom_compress) {
 #ifndef ZSTD_NOCOMPRESS
         FIO_setNbThreads(nbThreads);
+        FIO_setBlockSize((U32)blockSize);
         if ((filenameIdx==1) && outFileName)
           operationResult = FIO_compressFilename(outFileName, filenameTable[0], dictFileName, cLevel, &compressionParams);
         else