By default, section sizes are 4x window size.
This new setting allow manual selection of section sizes.
The larger they are, the (slightly) better the compression ratio,
but also the higher the memory allocation cost,
and eventually the lesser the nb of possible threads,
since each section is compressed by a single thread.
It also introduces a prototype to set generic parameters,
ZSTDMT_setMTCtxParameter()
The idea is that it's possible to add enums
to extend the list of parameters that can be set this way.
This is more long-term oriented than a fixed-size struct.
Consider it as a test.
/* ====== Tuning parameters ====== */
-#ifndef ZSTDMT_SECTION_LOGSIZE_MIN
-# define ZSTDMT_SECTION_LOGSIZE_MIN 20 /* minimum size for a full compression job (20==2^20==1 MB) */
-#endif
-
#define ZSTDMT_NBTHREADS_MAX 128
unsigned frameEnded;
unsigned allJobsCompleted;
unsigned long long frameContentSize;
+ size_t sectionSize;
ZSTD_CDict* cdict;
ZSTD_CStream* cstream;
ZSTDMT_jobDescription jobs[1]; /* variable size (must lies at the end) */
cctx->nbThreads = nbThreads;
cctx->jobIDMask = nbJobs - 1;
cctx->allJobsCompleted = 1;
+ cctx->sectionSize = 0;
cctx->factory = POOL_create(nbThreads, 1);
cctx->buffPool = ZSTDMT_createBufferPool(nbThreads);
cctx->cctxPool = ZSTDMT_createCCtxPool(nbThreads);
return 0;
}
+unsigned ZSTDMT_setMTCtxParameter(ZSTDMT_CCtx* mtctx, ZSDTMT_parameter parameter, unsigned value)
+{
+ switch(parameter)
+ {
+ case ZSTDMT_p_sectionSize :
+ mtctx->sectionSize = value;
+ return 0;
+ default :
+ return ERROR(compressionParameter_unsupported);
+ }
+}
+
+
+/* ------------------------------------------ */
+/* ===== Multi-threaded compression ===== */
+/* ------------------------------------------ */
size_t ZSTDMT_compressCCtx(ZSTDMT_CCtx* mtctx,
void* dst, size_t dstCapacity,
if (zcs->cdict == NULL) return ERROR(memory_allocation);
} }
zcs->frameContentSize = pledgedSrcSize;
- zcs->targetSectionSize = (size_t)1 << MAX(ZSTDMT_SECTION_LOGSIZE_MIN, (zcs->params.cParams.windowLog + 2));
+ zcs->targetSectionSize = zcs->sectionSize ? zcs->sectionSize : (size_t)1 << (zcs->params.cParams.windowLog + 2);
+ zcs->targetSectionSize = MAX(ZSTDMT_SECTION_SIZE_MIN, zcs->targetSectionSize);
zcs->inBuffSize = zcs->targetSectionSize + ((size_t)1 << zcs->params.cParams.windowLog);
zcs->inBuff.buffer = ZSTDMT_getBuffer(zcs->buffPool, zcs->inBuffSize);
if (zcs->inBuff.buffer.start == NULL) return ERROR(memory_allocation);
* of patent rights can be found in the PATENTS file in the same directory.
*/
+
+/* Note : All prototypes defined in this file shall be considered experimental.
+ * There is no guarantee of API continuity (yet) on any of these prototypes */
+
/* === Dependencies === */
#include <stddef.h> /* size_t */
#define ZSTD_STATIC_LINKING_ONLY /* ZSTD_parameters */
/* === Streaming functions === */
-ZSTDLIB_API size_t ZSTDMT_initCStream(ZSTDMT_CCtx* zcs, int compressionLevel);
-ZSTDLIB_API size_t ZSTDMT_resetCStream(ZSTDMT_CCtx* zcs, unsigned long long pledgedSrcSize); /**< pledgedSrcSize is optional and can be zero == unknown */
-ZSTDLIB_API size_t ZSTDMT_initCStream_advanced(ZSTDMT_CCtx* zcs, const void* dict, size_t dictSize, /**< dict can be released after init, a local copy is preserved within zcs */
- ZSTD_parameters params, unsigned long long pledgedSrcSize); /**< pledgedSrcSize is optional and can be zero == unknown */
+ZSTDLIB_API size_t ZSTDMT_initCStream(ZSTDMT_CCtx* mtctx, int compressionLevel);
+ZSTDLIB_API size_t ZSTDMT_resetCStream(ZSTDMT_CCtx* mtctx, unsigned long long pledgedSrcSize); /**< pledgedSrcSize is optional and can be zero == unknown */
+
+ZSTDLIB_API size_t ZSTDMT_compressStream(ZSTDMT_CCtx* mtctx, ZSTD_outBuffer* output, ZSTD_inBuffer* input);
+
+ZSTDLIB_API size_t ZSTDMT_flushStream(ZSTDMT_CCtx* mtctx, ZSTD_outBuffer* output); /**< @return : 0 == all flushed; >0 : still some data to be flushed; or an error code (ZSTD_isError()) */
+ZSTDLIB_API size_t ZSTDMT_endStream(ZSTDMT_CCtx* mtctx, ZSTD_outBuffer* output); /**< @return : 0 == all flushed; >0 : still some data to be flushed; or an error code (ZSTD_isError()) */
+
+
+/* === Advanced functions and parameters === */
+
+#ifndef ZSTDMT_SECTION_SIZE_MIN
+# define ZSTDMT_SECTION_SIZE_MIN (1U << 20) /* 1 MB - Minimum size of each compression job */
+#endif
+
+ZSTDLIB_API size_t ZSTDMT_initCStream_advanced(ZSTDMT_CCtx* mtctx, const void* dict, size_t dictSize, /**< dict can be released after init, a local copy is preserved within zcs */
+ ZSTD_parameters params, unsigned long long pledgedSrcSize); /**< pledgedSrcSize is optional and can be zero == unknown */
-ZSTDLIB_API size_t ZSTDMT_compressStream(ZSTDMT_CCtx* zcs, ZSTD_outBuffer* output, ZSTD_inBuffer* input);
+/* ZSDTMT_parameter :
+ * List of parameters that can be set using ZSTDMT_setMTCtxParameter() */
+typedef enum { ZSTDMT_p_sectionSize /* size of input "section". Each section is compressed in parallel. 0 means default, which is dynamically determined within compression functions */
+ } ZSDTMT_parameter;
-ZSTDLIB_API size_t ZSTDMT_flushStream(ZSTDMT_CCtx* zcs, ZSTD_outBuffer* output); /**< @return : 0 == all flushed; >0 : still some data to be flushed; or an error code (ZSTD_isError()) */
-ZSTDLIB_API size_t ZSTDMT_endStream(ZSTDMT_CCtx* zcs, ZSTD_outBuffer* output); /**< @return : 0 == all flushed; >0 : still some data to be flushed; or an error code (ZSTD_isError()) */
+/* ZSTDMT_setMTCtxParameter() :
+ * allow setting individual parameters, one at a time, among a list of enums defined in ZSTDMT_parameter.
+ * The function must be called typically after ZSTD_createCCtx().
+ * Parameters not explicitly reset by ZSTDMT_init*() remain the same in consecutive compression sessions.
+ * @return : 0, or an error code (which can be tested using ZSTD_isError()) */
+ZSTDLIB_API unsigned ZSTDMT_setMTCtxParameter(ZSTDMT_CCtx* mtctx, ZSDTMT_parameter parameter, unsigned value);
#endif
g_nbThreads = nbThreads;
}
+static U32 g_blockSize = 0;
+void FIO_setBlockSize(unsigned blockSize) {
+ if (blockSize && g_nbThreads==1)
+ DISPLAYLEVEL(2, "Setting block size is useless in single-thread mode \n");
+#ifdef ZSTD_MULTITHREAD
+ if (blockSize-1 < ZSTDMT_SECTION_SIZE_MIN-1) /* intentional underflow */
+ DISPLAYLEVEL(2, "Note : minimum block size is %u KB \n", (ZSTDMT_SECTION_SIZE_MIN>>10));
+#endif
+ g_blockSize = blockSize;
+}
/*-*************************************
if (comprParams->strategy) params.cParams.strategy = (ZSTD_strategy)(comprParams->strategy - 1);
#ifdef ZSTD_MULTITHREAD
{ size_t const errorCode = ZSTDMT_initCStream_advanced(ress.cctx, dictBuffer, dictBuffSize, params, srcSize);
+ if (ZSTD_isError(errorCode)) EXM_THROW(33, "Error initializing CStream : %s", ZSTD_getErrorName(errorCode));
+ ZSTDMT_setMTCtxParameter(ress.cctx, ZSTDMT_p_sectionSize, g_blockSize);
#else
{ size_t const errorCode = ZSTD_initCStream_advanced(ress.cctx, dictBuffer, dictBuffSize, params, srcSize);
-#endif
if (ZSTD_isError(errorCode)) EXM_THROW(33, "Error initializing CStream : %s", ZSTD_getErrorName(errorCode));
+#endif
} }
free(dictBuffer);
}
void FIO_setRemoveSrcFile(unsigned flag);
void FIO_setMemLimit(unsigned memLimit);
void FIO_setNbThreads(unsigned nbThreads);
+void FIO_setBlockSize(unsigned blockSize);
/*-*************************************
DISPLAY( "--[no-]check : integrity check (default:enabled) \n");
#ifdef ZSTD_MULTITHREAD
DISPLAY( " -T# : use # threads for compression (default:1) \n");
+ DISPLAY( " -B# : select size of independent sections (default:0==automatic) \n");
#endif
#endif
#ifndef ZSTD_NODECOMPRESS
if (operation==zom_compress) {
#ifndef ZSTD_NOCOMPRESS
FIO_setNbThreads(nbThreads);
+ FIO_setBlockSize((U32)blockSize);
if ((filenameIdx==1) && outFileName)
operationResult = FIO_compressFilename(outFileName, filenameTable[0], dictFileName, cLevel, &compressionParams);
else