]> git.ipfire.org Git - thirdparty/zstd.git/commitdiff
fixed some complex scenarios
authorYann Collet <cyan@fb.com>
Thu, 16 Nov 2017 23:02:28 +0000 (15:02 -0800)
committerYann Collet <cyan@fb.com>
Thu, 16 Nov 2017 23:18:18 +0000 (15:18 -0800)
Fixed : multithreading to compress some small data with dictionary
Fixed : ZSTD_initCStream_usingCDict()
Improved streaming memory usage when pledgedSrcSize is known.

lib/compress/zstd_compress.c
lib/compress/zstd_opt.c
lib/compress/zstdmt_compress.h
tests/zstreamtest.c

index 0bdcc26abdc2f4d304b9d59db5eaeaf258c8a1f6..f0cebe7a39665f0b8f32c9c62bc61d68aa427e72 100644 (file)
@@ -249,7 +249,6 @@ size_t ZSTD_CCtx_setParameter(ZSTD_CCtx* cctx, ZSTD_cParameter param, unsigned v
         return ZSTD_CCtxParam_setParameter(&cctx->requestedParams, param, value);
 
     case ZSTD_p_compressionLevel:
-        if (value == 0) return 0;  /* special value : 0 means "don't change anything" */
         if (cctx->cdict) return ERROR(stage_wrong);
         return ZSTD_CCtxParam_setParameter(&cctx->requestedParams, param, value);
 
@@ -260,7 +259,6 @@ size_t ZSTD_CCtx_setParameter(ZSTD_CCtx* cctx, ZSTD_cParameter param, unsigned v
     case ZSTD_p_minMatch:
     case ZSTD_p_targetLength:
     case ZSTD_p_compressionStrategy:
-        if (value == 0) return 0;  /* special value : 0 means "don't change anything" */
         if (cctx->cdict) return ERROR(stage_wrong);
         ZSTD_cLevelToCParams(cctx);  /* Can optimize if srcSize is known */
         return ZSTD_CCtxParam_setParameter(&cctx->requestedParams, param, value);
@@ -277,8 +275,6 @@ size_t ZSTD_CCtx_setParameter(ZSTD_CCtx* cctx, ZSTD_cParameter param, unsigned v
         return ZSTD_CCtxParam_setParameter(&cctx->requestedParams, param, value);
 
     case ZSTD_p_nbThreads:
-        if (value==0) return 0;
-        DEBUGLOG(5, " setting nbThreads : %u", value);
         if (value > 1 && cctx->staticSize) {
             return ERROR(parameter_unsupported);  /* MT not compatible with static alloc */
         }
@@ -288,22 +284,15 @@ size_t ZSTD_CCtx_setParameter(ZSTD_CCtx* cctx, ZSTD_cParameter param, unsigned v
         return ZSTD_CCtxParam_setParameter(&cctx->requestedParams, param, value);
 
     case ZSTD_p_overlapSizeLog:
-        DEBUGLOG(5, " setting overlap with nbThreads == %u", cctx->requestedParams.nbThreads);
         return ZSTD_CCtxParam_setParameter(&cctx->requestedParams, param, value);
 
     case ZSTD_p_enableLongDistanceMatching:
         if (cctx->cdict) return ERROR(stage_wrong);
-        if (value != 0) {
-            ZSTD_cLevelToCParams(cctx);
-        }
+        if (value>0) ZSTD_cLevelToCParams(cctx);
         return ZSTD_CCtxParam_setParameter(&cctx->requestedParams, param, value);
 
     case ZSTD_p_ldmHashLog:
     case ZSTD_p_ldmMinMatch:
-        if (value == 0) return 0;  /* special value : 0 means "don't change anything" */
-        if (cctx->cdict) return ERROR(stage_wrong);
-        return ZSTD_CCtxParam_setParameter(&cctx->requestedParams, param, value);
-
     case ZSTD_p_ldmBucketSizeLog:
     case ZSTD_p_ldmHashEveryLog:
         if (cctx->cdict) return ERROR(stage_wrong);
@@ -779,24 +768,27 @@ static U32 ZSTD_equivalentLdmParams(ldmParams_t ldmParams1,
 
 typedef enum { ZSTDb_not_buffered, ZSTDb_buffered } ZSTD_buffered_policy_e;
 
-/* ZSTD_equivalentBuffPolicy() :
+/* ZSTD_sufficientBuff() :
  * check internal buffers exist for streaming if buffPol == ZSTDb_buffered .
  * Note : they are assumed to be correctly sized if ZSTD_equivalentCParams()==1 */
-static U32 ZSTD_equivalentBuffPolicy(size_t bufferSize, ZSTD_buffered_policy_e buffPol)
+static U32 ZSTD_sufficientBuff(size_t bufferSize, ZSTD_buffered_policy_e buffPol, ZSTD_compressionParameters cParams2, U64 pledgedSrcSize)
 {
-    if ((buffPol == ZSTDb_buffered) & (!bufferSize)) return 0;
-    return 1;
+    size_t const windowSize = MAX(1, (size_t)MIN(((U64)1 << cParams2.windowLog), pledgedSrcSize));
+    size_t const blockSize = MIN(ZSTD_BLOCKSIZE_MAX, windowSize);
+    size_t const neededBufferSize = (buffPol==ZSTDb_buffered) ? windowSize + blockSize : 0;
+    return (neededBufferSize <= bufferSize);
 }
 
 /** Equivalence for resetCCtx purposes */
 static U32 ZSTD_equivalentParams(ZSTD_CCtx_params params1,
                                  ZSTD_CCtx_params params2,
                                  size_t buffSize1,
-                                 ZSTD_buffered_policy_e buffPol2)
+                                 ZSTD_buffered_policy_e buffPol2,
+                                 U64 pledgedSrcSize)
 {
     return ZSTD_equivalentCParams(params1.cParams, params2.cParams) &&
            ZSTD_equivalentLdmParams(params1.ldmParams, params2.ldmParams) &&
-           ZSTD_equivalentBuffPolicy(buffSize1, buffPol2);
+           ZSTD_sufficientBuff(buffSize1, buffPol2, params2.cParams, pledgedSrcSize);
 }
 
 /*! ZSTD_continueCCtx() :
@@ -837,7 +829,7 @@ static size_t ZSTD_resetCCtx_internal(ZSTD_CCtx* zc,
     assert(!ZSTD_isError(ZSTD_checkCParams(params.cParams)));
 
     if (crp == ZSTDcrp_continue) {
-        if (ZSTD_equivalentParams(params, zc->appliedParams, zc->inBuffSize, zbuff)) {
+        if (ZSTD_equivalentParams(params, zc->appliedParams, zc->inBuffSize, zbuff, pledgedSrcSize)) {
             DEBUGLOG(4, "ZSTD_equivalentParams()==1 -> continue mode");
             assert(!(params.ldmParams.enableLdm &&
                      params.ldmParams.hashEveryLog == ZSTD_LDM_HASHEVERYLOG_NOTSET));
@@ -858,7 +850,8 @@ static size_t ZSTD_resetCCtx_internal(ZSTD_CCtx* zc,
                 ZSTD_ldm_getHashPower(params.ldmParams.minMatchLength);
     }
 
-    {   size_t const blockSize = MIN(ZSTD_BLOCKSIZE_MAX, (size_t)1 << params.cParams.windowLog);
+    {   size_t const windowSize = MAX(1, (size_t)MIN(((U64)1 << params.cParams.windowLog), pledgedSrcSize));
+        size_t const blockSize = MIN(ZSTD_BLOCKSIZE_MAX, windowSize);
         U32    const divider = (params.cParams.searchLength==3) ? 3 : 4;
         size_t const maxNbSeq = blockSize / divider;
         size_t const tokenSpace = blockSize + 11*maxNbSeq;
@@ -870,7 +863,7 @@ static size_t ZSTD_resetCCtx_internal(ZSTD_CCtx* zc,
         size_t const h3Size = ((size_t)1) << hashLog3;
         size_t const tableSpace = (chainSize + hSize + h3Size) * sizeof(U32);
         size_t const buffOutSize = (zbuff==ZSTDb_buffered) ? ZSTD_compressBound(blockSize)+1 : 0;
-        size_t const buffInSize = (zbuff==ZSTDb_buffered) ? ((size_t)1 << params.cParams.windowLog) + blockSize : 0;
+        size_t const buffInSize = (zbuff==ZSTDb_buffered) ? windowSize + blockSize : 0;
         void* ptr;
 
         /* Check if workSpace is large enough, alloc a new one if needed */
@@ -886,10 +879,10 @@ static size_t ZSTD_resetCCtx_internal(ZSTD_CCtx* zc,
                 : 0;
             size_t const neededSpace = entropySpace + optSpace + ldmSpace +
                                        tableSpace + tokenSpace + bufferSpace;
-            DEBUGLOG(4, "Need %uKB workspace, including %uKB for tables",
-                        (U32)(neededSpace>>10), (U32)(tableSpace>>10));
-            DEBUGLOG(4, "chainSize: %u - hSize: %u - h3Size: %u",
-                        (U32)chainSize, (U32)hSize, (U32)h3Size);
+            DEBUGLOG(4, "Need %uKB workspace, including %uKB for tables, and %uKB for buffers",
+                        (U32)(neededSpace>>10), (U32)(tableSpace>>10), (U32)(bufferSpace>>10));
+            DEBUGLOG(4, "chainSize: %u - hSize: %u - h3Size: %u - windowSize: %u",
+                        (U32)chainSize, (U32)hSize, (U32)h3Size, (U32)windowSize);
 
             if (zc->workSpaceSize < neededSpace) {  /* too small : resize */
                 DEBUGLOG(4, "Need to update workSpaceSize from %uK to %uK",
@@ -917,7 +910,7 @@ static size_t ZSTD_resetCCtx_internal(ZSTD_CCtx* zc,
         zc->consumedSrcSize = 0;
         if (pledgedSrcSize == ZSTD_CONTENTSIZE_UNKNOWN)
             zc->appliedParams.fParams.contentSizeFlag = 0;
-        DEBUGLOG(5, "pledged content size : %u ; flag : %u",
+        DEBUGLOG(4, "pledged content size : %u ; flag : %u",
             (U32)pledgedSrcSize, zc->appliedParams.fParams.contentSizeFlag);
         zc->blockSize = blockSize;
 
@@ -1016,8 +1009,8 @@ void ZSTD_invalidateRepCodes(ZSTD_CCtx* cctx) {
 
 /*! ZSTD_copyCCtx_internal() :
  *  Duplicate an existing context `srcCCtx` into another one `dstCCtx`.
- *  The "context", in this case, refers to the hash and chain tables, entropy
- *  tables, and dictionary offsets.
+ *  The "context", in this case, refers to the hash and chain tables,
+ *  entropy tables, and dictionary offsets.
  *  Only works during stage ZSTDcs_init (i.e. after creation, but before first call to ZSTD_compressContinue()).
  *  pledgedSrcSize=0 means "empty".
  *  @return : 0, or an error code */
@@ -2079,6 +2072,7 @@ static size_t ZSTD_compressBegin_internal(ZSTD_CCtx* cctx,
     assert(!((dict) && (cdict)));  /* either dict or cdict, not both */
 
     if (cdict && cdict->dictContentSize>0) {
+        cctx->requestedParams = params;
         return ZSTD_copyCCtx_internal(cctx, cdict->refContext,
                                       params.fParams, pledgedSrcSize,
                                       zbuff);
@@ -2525,9 +2519,9 @@ size_t ZSTD_CStreamOutSize(void)
 }
 
 static size_t ZSTD_resetCStream_internal(ZSTD_CStream* zcs,
-                    const void* dict, size_t dictSize, ZSTD_dictMode_e dictMode,
-                    const ZSTD_CDict* cdict,
-                    const ZSTD_CCtx_params params, unsigned long long pledgedSrcSize)
+                    const void* const dict, size_t const dictSize, ZSTD_dictMode_e const dictMode,
+                    const ZSTD_CDict* const cdict,
+                    ZSTD_CCtx_params const params, unsigned long long const pledgedSrcSize)
 {
     DEBUGLOG(4, "ZSTD_resetCStream_internal");
     /* params are supposed to be fully validated at this point */
@@ -2606,8 +2600,9 @@ size_t ZSTD_initCStream_usingCDict_advanced(ZSTD_CStream* zcs,
                                             const ZSTD_CDict* cdict,
                                             ZSTD_frameParameters fParams,
                                             unsigned long long pledgedSrcSize)
-{   /* cannot handle NULL cdict (does not know what to do) */
-    if (!cdict) return ERROR(dictionary_wrong);
+{
+    DEBUGLOG(4, "ZSTD_initCStream_usingCDict_advanced");
+    if (!cdict) return ERROR(dictionary_wrong); /* cannot handle NULL cdict (does not know what to do) */
     {   ZSTD_CCtx_params params = zcs->requestedParams;
         params.cParams = ZSTD_getCParamsFromCDict(cdict);
         params.fParams = fParams;
@@ -2620,8 +2615,9 @@ size_t ZSTD_initCStream_usingCDict_advanced(ZSTD_CStream* zcs,
 /* note : cdict must outlive compression session */
 size_t ZSTD_initCStream_usingCDict(ZSTD_CStream* zcs, const ZSTD_CDict* cdict)
 {
-    ZSTD_frameParameters const fParams = { 0 /* contentSize */, 0 /* checksum */, 0 /* hideDictID */ };
-    return ZSTD_initCStream_usingCDict_advanced(zcs, cdict, fParams, 0);  /* note : will check that cdict != NULL */
+    ZSTD_frameParameters const fParams = { 0 /* contentSizeFlag */, 0 /* checksum */, 0 /* hideDictID */ };
+    DEBUGLOG(4, "ZSTD_initCStream_usingCDict");
+    return ZSTD_initCStream_usingCDict_advanced(zcs, cdict, fParams, ZSTD_CONTENTSIZE_UNKNOWN);  /* note : will check that cdict != NULL */
 }
 
 /* ZSTD_initCStream_advanced() :
@@ -2853,7 +2849,8 @@ size_t ZSTD_compress_generic (ZSTD_CCtx* cctx,
                 cctx->requestedParams, cctx->pledgedSrcSizePlusOne-1, 0 /*dictSize*/);
 
 #ifdef ZSTD_MULTITHREAD
-        if ((cctx->pledgedSrcSizePlusOne-1) <= ZSTDMT_JOBSIZE_MIN) params.nbThreads = 1; /* do not invoke multi-threading when src size is too small */
+        if ((cctx->pledgedSrcSizePlusOne-1) <= ZSTDMT_JOBSIZE_MIN)
+            params.nbThreads = 1; /* do not invoke multi-threading when src size is too small */
         if (params.nbThreads > 1) {
             if (cctx->mtctx == NULL || cctx->appliedParams.nbThreads != params.nbThreads) {
                 ZSTDMT_freeCCtx(cctx->mtctx);
@@ -2874,6 +2871,7 @@ size_t ZSTD_compress_generic (ZSTD_CCtx* cctx,
                              prefixDict.dictMode, cctx->cdict, params,
                              cctx->pledgedSrcSizePlusOne-1) );
             assert(cctx->streamStage == zcss_load);
+            assert(cctx->appliedParams.nbThreads <= 1);
     }   }
 
     /* compression stage */
index 52486bd3cba26f57034f1ebe2f17325fb5ec3195..aebe55f882bb42028d72f26174a8d568fb916529 100644 (file)
@@ -231,7 +231,7 @@ static U32 ZSTD_insertAndFindFirstIndexHash3 (ZSTD_CCtx* zc, const BYTE* ip)
 /*-*************************************
 *  Binary Tree search
 ***************************************/
-//FORCE_INLINE_TEMPLATE
+FORCE_INLINE_TEMPLATE
 U32 ZSTD_insertBtAndGetAllMatches (
                         ZSTD_CCtx* zc,
                         const BYTE* const ip, const BYTE* const iLimit, const int extDict,
index 598e6daf58c952ea02f5b116358df58565b5ede0..269c54b1ac8fdb668f5d8cdc4de66e2bd6a161a5 100644 (file)
@@ -112,6 +112,9 @@ ZSTDLIB_API size_t ZSTDMT_compressStream_generic(ZSTDMT_CCtx* mtctx,
 
 size_t ZSTDMT_CCtxParam_setMTCtxParameter(ZSTD_CCtx_params* params, ZSTDMT_parameter parameter, unsigned value);
 
+/* ZSTDMT_CCtxParam_setNbThreads()
+ * Set nbThreads, and clamp it correctly,
+ * but also reset jobSize and overlapLog */ 
 size_t ZSTDMT_CCtxParam_setNbThreads(ZSTD_CCtx_params* params, unsigned nbThreads);
 
 /*! ZSTDMT_initCStream_internal() :
index 384858edd1d360f2391b8d9180a082ac971873fe..4cdd6235bf5bcfb9b0c963a1f7aca91ee0d65b11 100644 (file)
@@ -473,6 +473,7 @@ static int basicUnitTests(U32 seed, double compressibility, ZSTD_customMem custo
     DISPLAYLEVEL(3, "test%3i : digested dictionary : ", testNb++);
     {   ZSTD_CDict* const cdict = ZSTD_createCDict(dictionary.start, dictionary.filled, 1 /*byRef*/ );
         size_t const initError = ZSTD_initCStream_usingCDict(zc, cdict);
+        DISPLAYLEVEL(5, "ZSTD_initCStream_usingCDict result : %u ", (U32)initError);
         if (ZSTD_isError(initError)) goto _output_error;
         cSize = 0;
         outBuff.dst = compressedBuffer;
@@ -481,10 +482,13 @@ static int basicUnitTests(U32 seed, double compressibility, ZSTD_customMem custo
         inBuff.src = CNBuffer;
         inBuff.size = CNBufferSize;
         inBuff.pos = 0;
+        DISPLAYLEVEL(5, "- starting ZSTD_compressStream ");
         CHECK_Z( ZSTD_compressStream(zc, &outBuff, &inBuff) );
         if (inBuff.pos != inBuff.size) goto _output_error;   /* entire input should be consumed */
-        { size_t const r = ZSTD_endStream(zc, &outBuff);
-          if (r != 0) goto _output_error; }  /* error, or some data not flushed */
+        {   size_t const r = ZSTD_endStream(zc, &outBuff);
+            DISPLAYLEVEL(5, "- ZSTD_endStream result : %u ", (U32)r);
+            if (r != 0) goto _output_error;  /* error, or some data not flushed */
+        }
         cSize = outBuff.pos;
         ZSTD_freeCDict(cdict);
         DISPLAYLEVEL(3, "OK (%u bytes : %.2f%%)\n", (U32)cSize, (double)cSize/CNBufferSize*100);