}
return ZSTD_CCtxParam_setParameter(&cctx->requestedParams, param, value);
+ case ZSTD_p_nonBlockingMode:
case ZSTD_p_jobSize:
- return ZSTD_CCtxParam_setParameter(&cctx->requestedParams, param, value);
-
case ZSTD_p_overlapSizeLog:
return ZSTD_CCtxParam_setParameter(&cctx->requestedParams, param, value);
return ZSTDMT_CCtxParam_setNbThreads(CCtxParams, value);
#endif
+ case ZSTD_p_nonBlockingMode :
+#ifndef ZSTD_MULTITHREAD
+ return ERROR(parameter_unsupported);
+#else
+ CCtxParams->nonBlockingMode = (value>0);
+ return CCtxParams->nonBlockingMode;
+#endif
+
case ZSTD_p_jobSize :
#ifndef ZSTD_MULTITHREAD
return ERROR(parameter_unsupported);
#else
- if (CCtxParams->nbThreads <= 1) return ERROR(parameter_unsupported);
return ZSTDMT_CCtxParam_setMTCtxParameter(CCtxParams, ZSTDMT_p_jobSize, value);
#endif
#ifndef ZSTD_MULTITHREAD
return ERROR(parameter_unsupported);
#else
- if (CCtxParams->nbThreads <= 1) return ERROR(parameter_unsupported);
return ZSTDMT_CCtxParam_setMTCtxParameter(CCtxParams, ZSTDMT_p_overlapSectionLog, value);
#endif
cctx->requestedParams, cctx->pledgedSrcSizePlusOne-1, 0 /*dictSize*/);
#ifdef ZSTD_MULTITHREAD
- if ((cctx->pledgedSrcSizePlusOne-1) <= ZSTDMT_JOBSIZE_MIN)
+ if ((cctx->pledgedSrcSizePlusOne-1) <= ZSTDMT_JOBSIZE_MIN) {
params.nbThreads = 1; /* do not invoke multi-threading when src size is too small */
- if (params.nbThreads > 1) {
+ params.nonBlockingMode = 0;
+ }
+ if ((params.nbThreads > 1) | (params.nonBlockingMode == 1)) {
if (cctx->mtctx == NULL || (params.nbThreads != ZSTDMT_getNbThreads(cctx->mtctx))) {
- DEBUGLOG(4, "ZSTD_compress_generic: creating new mtctx for nbThreads=%u (previous: %u)",
- params.nbThreads, (unsigned)ZSTDMT_getNbThreads(cctx->mtctx));
+ DEBUGLOG(4, "ZSTD_compress_generic: creating new mtctx for nbThreads=%u",
+ params.nbThreads);
+ if (cctx->mtctx != NULL)
+ DEBUGLOG(4, "ZSTD_compress_generic: previous nbThreads was %u",
+ ZSTDMT_getNbThreads(cctx->mtctx));
ZSTDMT_freeCCtx(cctx->mtctx);
cctx->mtctx = ZSTDMT_createCCtx_advanced(params.nbThreads, cctx->customMem);
if (cctx->mtctx == NULL) return ERROR(memory_allocation);
cctx->cdict, params, cctx->pledgedSrcSizePlusOne-1) );
cctx->streamStage = zcss_load;
cctx->appliedParams.nbThreads = params.nbThreads;
+ cctx->appliedParams.nonBlockingMode = params.nonBlockingMode;
} else
#endif
{ CHECK_F( ZSTD_resetCStream_internal(
/* compression stage */
#ifdef ZSTD_MULTITHREAD
- if (cctx->appliedParams.nbThreads > 1) {
+ if ((cctx->appliedParams.nbThreads > 1) | (cctx->appliedParams.nonBlockingMode==1)) {
size_t const flushMin = ZSTDMT_compressStream_generic(cctx->mtctx, output, input, endOp);
if ( ZSTD_isError(flushMin)
|| (endOp == ZSTD_e_end && flushMin == 0) ) { /* compression completed */
ZSTD_p_dictIDFlag, /* When applicable, dictionary's ID is written into frame header (default:1) */
/* multi-threading parameters */
+ /* These parameters are only useful if multi-threading is enabled (ZSTD_MULTITHREAD).
+ * They return an error otherwise. */
ZSTD_p_nbThreads=400, /* Select how many threads a compression job can spawn (default:1)
* More threads improve speed, but also increase memory usage.
* Can only receive a value > 1 if ZSTD_MULTITHREAD is enabled.
* Special: value 0 means "do not change nbThreads" */
+ ZSTD_p_nonBlockingMode, /* Single thread mode is by default "blocking" :
+ * it finishes its job as much as possible, and only then gives back control to caller.
+ * In contrast, multi-thread is by default "non-blocking" :
+ * it takes some input, flush some output if available, and immediately gives back control to caller.
+ * Compression work is performed in parallel, within worker threads.
+ * (note : a strong exception to this rule is when first job is called with ZSTD_e_end : it becomes blocking)
+ * Setting this parameter to 1 will enforce non-blocking mode even when only 1 thread is selected.
+ * It allows the caller to do other tasks while the worker thread compresses in parallel. */
ZSTD_p_jobSize, /* Size of a compression job. This value is only enforced in streaming (non-blocking) mode.
* Each compression job is completed in parallel, so indirectly controls the nb of active threads.
* 0 means default, which is dynamically determined based on compression parameters.