]> git.ipfire.org Git - thirdparty/zstd.git/commitdiff
Move jobSize and overlapLog in zstdmt to cctxParams
authorStella Lau <laus@fb.com>
Fri, 25 Aug 2017 20:14:51 +0000 (13:14 -0700)
committerStella Lau <laus@fb.com>
Fri, 25 Aug 2017 20:14:51 +0000 (13:14 -0700)
lib/compress/zstd_compress.c
lib/compress/zstdmt_compress.c
tests/Makefile
tests/roundTripCrash.c

index 5c7a724cc4cdc415a067e0235071bc7b7f0a7e42..c870371963b833f17b1cb32214e0a0166d27e72c 100644 (file)
@@ -307,6 +307,10 @@ static ZSTD_CCtx_params ZSTD_assignParamsToCCtxParams(
         return ERROR(parameter_outOfBound);  \
 }   }
 
+size_t ZSTDMT_CCtxParam_setMTCtxParameter(
+    ZSTD_CCtx_params* params, ZSDTMT_parameter parameter, unsigned value);
+size_t ZSTDMT_initializeCCtxParameters(ZSTD_CCtx_params* params, unsigned nbThreads);
+
 size_t ZSTD_CCtx_setParameter(ZSTD_CCtx* cctx, ZSTD_cParameter param, unsigned value)
 {
     if (cctx->streamStage != zcss_init) return ERROR(stage_wrong);
@@ -359,19 +363,20 @@ size_t ZSTD_CCtx_setParameter(ZSTD_CCtx* cctx, ZSTD_cParameter param, unsigned v
             cctx->mtctx = ZSTDMT_createCCtx_advanced(value, cctx->customMem);
             if (cctx->mtctx == NULL) return ERROR(memory_allocation);
         }
-        cctx->requestedParams.nbThreads = value;
-        return 0;
+
+        /* Need to initialize overlapSizeLog */
+        return ZSTDMT_initializeCCtxParameters(&cctx->requestedParams, value);
 
     case ZSTD_p_jobSize:
         if (cctx->requestedParams.nbThreads <= 1) return ERROR(parameter_unsupported);
         assert(cctx->mtctx != NULL);
-        return ZSTDMT_setMTCtxParameter(cctx->mtctx, ZSTDMT_p_sectionSize, value);
+        return ZSTD_CCtxParam_setParameter(&cctx->requestedParams, param, value);
 
     case ZSTD_p_overlapSizeLog:
         DEBUGLOG(5, " setting overlap with nbThreads == %u", cctx->requestedParams.nbThreads);
         if (cctx->requestedParams.nbThreads <= 1) return ERROR(parameter_unsupported);
         assert(cctx->mtctx != NULL);
-        return ZSTDMT_setMTCtxParameter(cctx->mtctx, ZSTDMT_p_overlapSectionLog, value);
+        return ZSTD_CCtxParam_setParameter(&cctx->requestedParams, param, value);
 
     default: return ERROR(parameter_unsupported);
     }
@@ -471,19 +476,15 @@ size_t ZSTD_CCtxParam_setParameter(
 #ifndef ZSTD_MULTITHREAD
         if (value > 1) return ERROR(parameter_unsupported);
 #endif
-        /* Do checks when applying params to cctx */
-        params->nbThreads = value;
-        return 0;
+        return ZSTDMT_initializeCCtxParameters(params, value);
 
     case ZSTD_p_jobSize :
         if (params->nbThreads <= 1) return ERROR(parameter_unsupported);
-        params->jobSize = value;
-        return 0;
+        return ZSTDMT_CCtxParam_setMTCtxParameter(params, ZSTDMT_p_sectionSize, value);
 
     case ZSTD_p_overlapSizeLog :
         if (params->nbThreads <= 1) return ERROR(parameter_unsupported);
-        params->overlapSizeLog = value;
-        return 0;
+        return ZSTDMT_CCtxParam_setMTCtxParameter(params, ZSTDMT_p_overlapSectionLog, value);
 
     default: return ERROR(parameter_unsupported);
     }
index cf21177e0eb173b6a9b469b6e66c3a33ba3c303d..7c3bd0198f9f946ab2f6db569ea7bfa5a8e9239d 100644 (file)
@@ -404,15 +404,12 @@ struct ZSTDMT_CCtx_s {
     inBuff_t inBuff;
     ZSTD_CCtx_params params;
     XXH64_state_t xxhState;
-    unsigned nbThreads;
     unsigned jobIDMask;
     unsigned doneJobID;
     unsigned nextJobID;
     unsigned frameEnded;
     unsigned allJobsCompleted;
-    unsigned overlapLog;
     unsigned long long frameContentSize;
-    size_t sectionSize;
     ZSTD_customMem cMem;
     ZSTD_CDict* cdictLocal;
     const ZSTD_CDict* cdict;
@@ -427,6 +424,15 @@ static ZSTDMT_jobDescription* ZSTDMT_allocJobsTable(U32* nbJobsPtr, ZSTD_customM
                             nbJobs * sizeof(ZSTDMT_jobDescription), cMem);
 }
 
+/* Internal only */
+size_t ZSTDMT_initializeCCtxParameters(ZSTD_CCtx_params* params, unsigned nbThreads)
+{
+    params->nbThreads = nbThreads;
+    params->overlapSizeLog = ZSTDMT_OVERLAPLOG_DEFAULT;
+    params->jobSize = 0;
+    return 0;
+}
+
 ZSTDMT_CCtx* ZSTDMT_createCCtx_advanced(unsigned nbThreads, ZSTD_customMem cMem)
 {
     ZSTDMT_CCtx* mtctx;
@@ -441,11 +447,9 @@ ZSTDMT_CCtx* ZSTDMT_createCCtx_advanced(unsigned nbThreads, ZSTD_customMem cMem)
 
     mtctx = (ZSTDMT_CCtx*) ZSTD_calloc(sizeof(ZSTDMT_CCtx), cMem);
     if (!mtctx) return NULL;
+    ZSTDMT_initializeCCtxParameters(&mtctx->params, nbThreads);
     mtctx->cMem = cMem;
-    mtctx->nbThreads = nbThreads;
     mtctx->allJobsCompleted = 1;
-    mtctx->sectionSize = 0;
-    mtctx->overlapLog = ZSTDMT_OVERLAPLOG_DEFAULT;
     mtctx->factory = POOL_create(nbThreads, 1);
     mtctx->jobs = ZSTDMT_allocJobsTable(&nbJobs, cMem);
     mtctx->jobIDMask = nbJobs - 1;
@@ -516,22 +520,35 @@ size_t ZSTDMT_sizeof_CCtx(ZSTDMT_CCtx* mtctx)
             + ZSTD_sizeof_CDict(mtctx->cdictLocal);
 }
 
-size_t ZSTDMT_setMTCtxParameter(ZSTDMT_CCtx* mtctx, ZSDTMT_parameter parameter, unsigned value)
-{
+/* Internal only */
+size_t ZSTDMT_CCtxParam_setMTCtxParameter(
+    ZSTD_CCtx_params* params, ZSDTMT_parameter parameter, unsigned value) {
     switch(parameter)
     {
     case ZSTDMT_p_sectionSize :
-        mtctx->sectionSize = value;
+        params->jobSize = value;
         return 0;
     case ZSTDMT_p_overlapSectionLog :
         DEBUGLOG(5, "ZSTDMT_p_overlapSectionLog : %u", value);
-        mtctx->overlapLog = (value >= 9) ? 9 : value;
+        params->overlapSizeLog = (value >= 9) ? 9 : value;
         return 0;
     default :
         return ERROR(parameter_unsupported);
     }
 }
 
+size_t ZSTDMT_setMTCtxParameter(ZSTDMT_CCtx* mtctx, ZSDTMT_parameter parameter, unsigned value)
+{
+    switch(parameter)
+    {
+    case ZSTDMT_p_sectionSize :
+        return ZSTDMT_CCtxParam_setMTCtxParameter(&mtctx->params, parameter, value);
+    case ZSTDMT_p_overlapSectionLog :
+        return ZSTDMT_CCtxParam_setMTCtxParameter(&mtctx->params, parameter, value);
+    default :
+        return ERROR(parameter_unsupported);
+    }
+}
 
 /* ------------------------------------------ */
 /* =====   Multi-threaded compression   ===== */
@@ -553,13 +570,12 @@ static size_t ZSTDMT_compress_advanced_internal(
                 void* dst, size_t dstCapacity,
           const void* src, size_t srcSize,
           const ZSTD_CDict* cdict,
-                ZSTD_CCtx_params const cctxParams,
-                unsigned overlapLog)
+                ZSTD_CCtx_params const params)
 {
-    ZSTD_CCtx_params const jobParams = ZSTDMT_makeJobCCtxParams(cctxParams);
-    unsigned const overlapRLog = (overlapLog>9) ? 0 : 9-overlapLog;
-    size_t const overlapSize = (overlapRLog>=9) ? 0 : (size_t)1 << (cctxParams.cParams.windowLog - overlapRLog);
-    unsigned nbChunks = computeNbChunks(srcSize, cctxParams.cParams.windowLog, mtctx->nbThreads);
+    ZSTD_CCtx_params const jobParams = ZSTDMT_makeJobCCtxParams(params);
+    unsigned const overlapRLog = (params.overlapSizeLog>9) ? 0 : 9-params.overlapSizeLog;
+    size_t const overlapSize = (overlapRLog>=9) ? 0 : (size_t)1 << (params.cParams.windowLog - overlapRLog);
+    unsigned nbChunks = computeNbChunks(srcSize, params.cParams.windowLog, params.nbThreads);
     size_t const proposedChunkSize = (srcSize + (nbChunks-1)) / nbChunks;
     size_t const avgChunkSize = ((proposedChunkSize & 0x1FFFF) < 0x7FFF) ? proposedChunkSize + 0xFFFF : proposedChunkSize;   /* avoid too small last block */
     const char* const srcStart = (const char*)src;
@@ -567,6 +583,8 @@ static size_t ZSTDMT_compress_advanced_internal(
     unsigned const compressWithinDst = (dstCapacity >= ZSTD_compressBound(srcSize)) ? nbChunks : (unsigned)(dstCapacity / ZSTD_compressBound(avgChunkSize));  /* presumes avgChunkSize >= 256 KB, which should be the case */
     size_t frameStartPos = 0, dstBufferPos = 0;
     XXH64_state_t xxh64;
+    assert(jobParams.nbThreads == 0);
+    assert(mtctx->cctxPool.totalCCtx == params.nbThreads);
 
     DEBUGLOG(4, "nbChunks  : %2u   (chunkSize : %u bytes)   ", nbChunks, (U32)avgChunkSize);
     if (nbChunks==1) {   /* fallback to single-thread mode */
@@ -613,7 +631,7 @@ static size_t ZSTDMT_compress_advanced_internal(
             mtctx->jobs[u].jobCompleted_mutex = &mtctx->jobCompleted_mutex;
             mtctx->jobs[u].jobCompleted_cond = &mtctx->jobCompleted_cond;
 
-            if (cctxParams.fParams.checksumFlag) {
+            if (params.fParams.checksumFlag) {
                 XXH64_update(&xxh64, srcStart + frameStartPos, chunkSize);
             }
 
@@ -656,8 +674,8 @@ static size_t ZSTDMT_compress_advanced_internal(
             }
         }  /* for (chunkID=0; chunkID<nbChunks; chunkID++) */
 
-        DEBUGLOG(4, "checksumFlag : %u ", cctxParams.fParams.checksumFlag);
-        if (cctxParams.fParams.checksumFlag) {
+        DEBUGLOG(4, "checksumFlag : %u ", params.fParams.checksumFlag);
+        if (params.fParams.checksumFlag) {
             U32 const checksum = (U32)XXH64_digest(&xxh64);
             if (dstPos + 4 > dstCapacity) {
                 error = ERROR(dstSize_tooSmall);
@@ -682,10 +700,11 @@ size_t ZSTDMT_compress_advanced(ZSTDMT_CCtx* mtctx,
     ZSTD_CCtx_params cctxParams = mtctx->params;
     cctxParams.cParams = params.cParams;
     cctxParams.fParams = params.fParams;
+    cctxParams.overlapSizeLog = overlapLog;
     return ZSTDMT_compress_advanced_internal(mtctx,
                                              dst, dstCapacity,
                                              src, srcSize,
-                                             cdict, cctxParams, overlapLog);
+                                             cdict, cctxParams);
 }
 
 
@@ -722,20 +741,22 @@ static void ZSTDMT_waitForAllJobsCompleted(ZSTDMT_CCtx* zcs)
 
 size_t ZSTDMT_initCStream_internal(
         ZSTDMT_CCtx* zcs, const void* dict, size_t dictSize,
-        const ZSTD_CDict* cdict, ZSTD_CCtx_params cctxParams,
+        const ZSTD_CDict* cdict, ZSTD_CCtx_params params,
         unsigned long long pledgedSrcSize)
 {
     DEBUGLOG(4, "ZSTDMT_initCStream_internal");
     /* params are supposed to be fully validated at this point */
-    assert(!ZSTD_isError(ZSTD_checkCParams(cctxParams.cParams)));
+    assert(!ZSTD_isError(ZSTD_checkCParams(params.cParams)));
     assert(!((dict) && (cdict)));  /* either dict or cdict, not both */
+    assert(mtctx->cctxPool.totalCCtx == params.nbThreads);
 
-    if (zcs->nbThreads==1) {
-        ZSTD_CCtx_params const jobParams = ZSTDMT_makeJobCCtxParams(cctxParams);
+    if (params.nbThreads==1) {
+        ZSTD_CCtx_params const singleThreadParams = ZSTDMT_makeJobCCtxParams(params);
         DEBUGLOG(4, "single thread mode");
+        assert(singleThreadParams.nbThreads == 0);
         return ZSTD_initCStream_internal(zcs->cctxPool->cctx[0],
                                          dict, dictSize, cdict,
-                                         jobParams, pledgedSrcSize);
+                                         singleThreadParams, pledgedSrcSize);
     }
 
     if (zcs->allJobsCompleted == 0) {   /* previous compression not correctly finished */
@@ -744,14 +765,14 @@ size_t ZSTDMT_initCStream_internal(
         zcs->allJobsCompleted = 1;
     }
 
-    zcs->params = cctxParams;
+    zcs->params = params;
     zcs->frameContentSize = pledgedSrcSize;
     if (dict) {
         DEBUGLOG(4,"cdictLocal: %08X", (U32)(size_t)zcs->cdictLocal);
         ZSTD_freeCDict(zcs->cdictLocal);
         zcs->cdictLocal = ZSTD_createCDict_advanced(dict, dictSize,
                                                     0 /* byRef */, ZSTD_dm_auto, /* note : a loadPrefix becomes an internal CDict */
-                                                    cctxParams.cParams, zcs->cMem);
+                                                    params.cParams, zcs->cMem);
         zcs->cdict = zcs->cdictLocal;
         if (zcs->cdictLocal == NULL) return ERROR(memory_allocation);
     } else {
@@ -761,10 +782,10 @@ size_t ZSTDMT_initCStream_internal(
         zcs->cdict = cdict;
     }
 
-    zcs->targetDictSize = (zcs->overlapLog==0) ? 0 : (size_t)1 << (zcs->params.cParams.windowLog - (9 - zcs->overlapLog));
-    DEBUGLOG(4, "overlapLog : %u ", zcs->overlapLog);
+    zcs->targetDictSize = (params.overlapSizeLog==0) ? 0 : (size_t)1 << (params.cParams.windowLog - (9 - params.overlapSizeLog));
+    DEBUGLOG(4, "overlapLog : %u ", params.overlapSizeLog);
     DEBUGLOG(4, "overlap Size : %u KB", (U32)(zcs->targetDictSize>>10));
-    zcs->targetSectionSize = zcs->sectionSize ? zcs->sectionSize : (size_t)1 << (zcs->params.cParams.windowLog + 2);
+    zcs->targetSectionSize = params.jobSize ? params.jobSize : (size_t)1 << (params.cParams.windowLog + 2);
     zcs->targetSectionSize = MAX(ZSTDMT_SECTION_SIZE_MIN, zcs->targetSectionSize);
     zcs->targetSectionSize = MAX(zcs->targetDictSize, zcs->targetSectionSize);
     DEBUGLOG(4, "Section Size : %u KB", (U32)(zcs->targetSectionSize>>10));
@@ -776,7 +797,7 @@ size_t ZSTDMT_initCStream_internal(
     zcs->nextJobID = 0;
     zcs->frameEnded = 0;
     zcs->allJobsCompleted = 0;
-    if (cctxParams.fParams.checksumFlag) XXH64_reset(&zcs->xxhState, 0);
+    if (params.fParams.checksumFlag) XXH64_reset(&zcs->xxhState, 0);
     return 0;
 }
 
@@ -798,11 +819,12 @@ size_t ZSTDMT_initCStream_usingCDict(ZSTDMT_CCtx* mtctx,
                                      ZSTD_frameParameters fParams,
                                      unsigned long long pledgedSrcSize)
 {
-    ZSTD_CCtx_params params = ZSTD_getCCtxParamsFromCDict(cdict);
+    ZSTD_CCtx_params cctxParams = mtctx->params;
+    cctxParams.cParams = ZSTD_getCCtxParamsFromCDict(cdict).cParams;
+    cctxParams.fParams = fParams;
     if (cdict==NULL) return ERROR(dictionary_wrong);   /* method incompatible with NULL cdict */
-    params.fParams = fParams;
     return ZSTDMT_initCStream_internal(mtctx, NULL, 0 /*dictSize*/, cdict,
-                                       params, pledgedSrcSize);
+                                       cctxParams, pledgedSrcSize);
 }
 
 
@@ -810,7 +832,7 @@ size_t ZSTDMT_initCStream_usingCDict(ZSTDMT_CCtx* mtctx,
  * pledgedSrcSize is optional and can be zero == unknown */
 size_t ZSTDMT_resetCStream(ZSTDMT_CCtx* zcs, unsigned long long pledgedSrcSize)
 {
-    if (zcs->nbThreads==1)
+    if (zcs->params.nbThreads==1)
         return ZSTD_resetCStream(zcs->cctxPool->cctx[0], pledgedSrcSize);
     return ZSTDMT_initCStream_internal(zcs, NULL, 0, 0, zcs->params,
                                        pledgedSrcSize);
@@ -965,7 +987,7 @@ size_t ZSTDMT_compressStream_generic(ZSTDMT_CCtx* mtctx,
         /* current frame being ended. Only flush/end are allowed. Or start new frame with init */
         return ERROR(stage_wrong);
     }
-    if (mtctx->nbThreads==1) {  /* delegate to single-thread (synchronous) */
+    if (mtctx->params.nbThreads==1) {  /* delegate to single-thread (synchronous) */
         return ZSTD_compressStream_generic(mtctx->cctxPool->cctx[0], output, input, endOp);
     }
 
@@ -977,7 +999,7 @@ size_t ZSTDMT_compressStream_generic(ZSTDMT_CCtx* mtctx,
         size_t const cSize = ZSTDMT_compress_advanced_internal(mtctx,
                 (char*)output->dst + output->pos, output->size - output->pos,
                 (const char*)input->src + input->pos, input->size - input->pos,
-                mtctx->cdict, mtctx->params, mtctx->overlapLog);
+                mtctx->cdict, mtctx->params);
         if (ZSTD_isError(cSize)) return cSize;
         input->pos = input->size;
         output->pos += cSize;
@@ -1052,7 +1074,7 @@ static size_t ZSTDMT_flushStream_internal(ZSTDMT_CCtx* zcs, ZSTD_outBuffer* outp
 size_t ZSTDMT_flushStream(ZSTDMT_CCtx* zcs, ZSTD_outBuffer* output)
 {
     DEBUGLOG(5, "ZSTDMT_flushStream");
-    if (zcs->nbThreads==1)
+    if (zcs->params.nbThreads==1)
         return ZSTD_flushStream(zcs->cctxPool->cctx[0], output);
     return ZSTDMT_flushStream_internal(zcs, output, 0 /* endFrame */);
 }
@@ -1060,7 +1082,7 @@ size_t ZSTDMT_flushStream(ZSTDMT_CCtx* zcs, ZSTD_outBuffer* output)
 size_t ZSTDMT_endStream(ZSTDMT_CCtx* zcs, ZSTD_outBuffer* output)
 {
     DEBUGLOG(4, "ZSTDMT_endStream");
-    if (zcs->nbThreads==1)
+    if (zcs->params.nbThreads==1)
         return ZSTD_endStream(zcs->cctxPool->cctx[0], output);
     return ZSTDMT_flushStream_internal(zcs, output, 1 /* endFrame */);
 }
index 006059b72d31e51380c75f586d6c678f67a92a93..f6389bd911635c2500f8eb8acd0ae06eec14ef73 100644 (file)
@@ -167,7 +167,7 @@ datagen : $(PRGDIR)/datagen.c datagencli.c
        $(CC)      $(FLAGS) $^ -o $@$(EXT)
 
 roundTripCrash : $(ZSTD_FILES) roundTripCrash.c
-       $(CC)      $(FLAGS) $^ -o $@$(EXT)
+       $(CC)      $(FLAGS) $(MULTITHREAD) $^ -o $@$(EXT)
 
 longmatch  : $(ZSTD_FILES) longmatch.c
        $(CC)      $(FLAGS) $^ -o $@$(EXT)
index fb14fa87b80bf0321adf8101cca0fb5f4b6d2f75..cb0221c5804d12f1e4712dbe01788af7f568b9e7 100644 (file)
@@ -93,6 +93,9 @@ static size_t cctxParamRoundTripTest(void* resultBuff, size_t resultBuffCapacity
 
     /* Set parameters */
     CHECK_Z( ZSTD_CCtxParam_setParameter(cctxParams, ZSTD_p_compressionLevel, cLevel) );
+    CHECK_Z( ZSTD_CCtxParam_setParameter(cctxParams, ZSTD_p_nbThreads, 2) );
+    CHECK_Z( ZSTD_CCtxParam_setParameter(cctxParams, ZSTD_p_overlapSizeLog, 5) );
+
 
     /* Apply parameters */
     CHECK_Z( ZSTD_CCtx_applyCCtxParams(cctx, cctxParams) );