]> git.ipfire.org Git - thirdparty/zstd.git/commitdiff
[zstdmt] Fix jobsize bugs (#1205)
authorNick Terrell <nickrterrell@gmail.com>
Mon, 25 Jun 2018 22:21:08 +0000 (15:21 -0700)
committerGitHub <noreply@github.com>
Mon, 25 Jun 2018 22:21:08 +0000 (15:21 -0700)
[zstdmt] Fix jobsize bugs

* `ZSTDMT_serialState_reset()` should use `targetSectionSize`, not `jobSize` when sizing the seqstore.
  Add an assert that checks that we sized the seqstore using the right job size.
* `ZSTDMT_compressionJob()` should check if `rawSeqStore.seq == NULL`.
* `ZSTDMT_initCStream_internal()` should not adjust `mtctx->params.jobSize` (clamping to MIN/MAX is okay).

lib/compress/zstdmt_compress.c
lib/compress/zstdmt_compress.h
tests/zstreamtest.c

index b180b1b7d1740f6e29b2063178ffc26e65b633e2..6daedca8b3d0a57af2ad48e5fa7934a521a92e6d 100644 (file)
@@ -459,7 +459,7 @@ typedef struct {
     ZSTD_window_t ldmWindow;  /* A thread-safe copy of ldmState.window */
 } serialState_t;
 
-static int ZSTDMT_serialState_reset(serialState_t* serialState, ZSTDMT_seqPool* seqPool, ZSTD_CCtx_params params)
+static int ZSTDMT_serialState_reset(serialState_t* serialState, ZSTDMT_seqPool* seqPool, ZSTD_CCtx_params params, size_t jobSize)
 {
     /* Adjust parameters */
     if (params.ldmParams.enableLdm) {
@@ -486,7 +486,7 @@ static int ZSTDMT_serialState_reset(serialState_t* serialState, ZSTDMT_seqPool*
             serialState->params.ldmParams.hashLog -
             serialState->params.ldmParams.bucketSizeLog;
         /* Size the seq pool tables */
-        ZSTDMT_setNbSeq(seqPool, ZSTD_ldm_getMaxNbSeq(params.ldmParams, params.jobSize));
+        ZSTDMT_setNbSeq(seqPool, ZSTD_ldm_getMaxNbSeq(params.ldmParams, jobSize));
         /* Reset the window */
         ZSTD_window_clear(&serialState->ldmState.window);
         serialState->ldmWindow = serialState->ldmState.window;
@@ -506,6 +506,7 @@ static int ZSTDMT_serialState_reset(serialState_t* serialState, ZSTDMT_seqPool*
         memset(serialState->ldmState.bucketOffsets, 0, bucketSize);
     }
     serialState->params = params;
+    serialState->params.jobSize = (U32)jobSize;
     return 0;
 }
 
@@ -547,6 +548,7 @@ static void ZSTDMT_serialState_update(serialState_t* serialState,
             size_t error;
             assert(seqStore.seq != NULL && seqStore.pos == 0 &&
                    seqStore.size == 0 && seqStore.capacity > 0);
+            assert(src.size <= serialState->params.jobSize);
             ZSTD_window_update(&serialState->ldmState.window, src.start, src.size);
             error = ZSTD_ldm_generateSequences(
                 &serialState->ldmState, &seqStore,
@@ -635,13 +637,6 @@ void ZSTDMT_compressionJob(void* jobDescription)
     rawSeqStore_t rawSeqStore = ZSTDMT_getSeq(job->seqPool);
     buffer_t dstBuff = job->dstBuff;
 
-    /* Don't compute the checksum for chunks, since we compute it externally,
-     * but write it in the header.
-     */
-    if (job->jobID != 0) jobParams.fParams.checksumFlag = 0;
-    /* Don't run LDM for the chunks, since we handle it externally */
-    jobParams.ldmParams.enableLdm = 0;
-
     /* ressources */
     if (cctx==NULL) {
         job->cSize = ERROR(memory_allocation);
@@ -655,6 +650,18 @@ void ZSTDMT_compressionJob(void* jobDescription)
         }
         job->dstBuff = dstBuff;   /* this value can be read in ZSTDMT_flush, when it copies the whole job */
     }
+    if (jobParams.ldmParams.enableLdm && rawSeqStore.seq == NULL) {
+        job->cSize = ERROR(memory_allocation);
+        goto _endJob;
+    }
+
+    /* Don't compute the checksum for chunks, since we compute it externally,
+     * but write it in the header.
+     */
+    if (job->jobID != 0) jobParams.fParams.checksumFlag = 0;
+    /* Don't run LDM for the chunks, since we handle it externally */
+    jobParams.ldmParams.enableLdm = 0;
+
 
     /* init */
     if (job->cdict) {
@@ -972,6 +979,8 @@ size_t ZSTDMT_CCtxParam_setMTCtxParameter(ZSTD_CCtx_params* params,
         if ( (value > 0)  /* value==0 => automatic job size */
            & (value < ZSTDMT_JOBSIZE_MIN) )
             value = ZSTDMT_JOBSIZE_MIN;
+        if (value > ZSTDMT_JOBSIZE_MAX)
+            value = ZSTDMT_JOBSIZE_MAX;
         params->jobSize = value;
         return value;
     case ZSTDMT_p_overlapSectionLog :
@@ -998,6 +1007,21 @@ size_t ZSTDMT_setMTCtxParameter(ZSTDMT_CCtx* mtctx, ZSTDMT_parameter parameter,
     }
 }
 
+size_t ZSTDMT_getMTCtxParameter(ZSTDMT_CCtx* mtctx, ZSTDMT_parameter parameter, unsigned* value)
+{
+    switch (parameter) {
+    case ZSTDMT_p_jobSize:
+        *value = mtctx->params.jobSize;
+        break;
+    case ZSTDMT_p_overlapSectionLog:
+        *value = mtctx->params.overlapSizeLog;
+        break;
+    default:
+        return ERROR(parameter_unsupported);
+    }
+    return 0;
+}
+
 /* Sets parameters relevant to the compression job,
  * initializing others to default values. */
 static ZSTD_CCtx_params ZSTDMT_initJobCCtxParams(ZSTD_CCtx_params const params)
@@ -1143,7 +1167,7 @@ static size_t ZSTDMT_compress_advanced_internal(
 
     assert(avgJobSize >= 256 KB);  /* condition for ZSTD_compressBound(A) + ZSTD_compressBound(B) <= ZSTD_compressBound(A+B), required to compress directly into Dst (no additional buffer) */
     ZSTDMT_setBufferSize(mtctx->bufPool, ZSTD_compressBound(avgJobSize) );
-    if (ZSTDMT_serialState_reset(&mtctx->serial, mtctx->seqPool, params))
+    if (ZSTDMT_serialState_reset(&mtctx->serial, mtctx->seqPool, params, avgJobSize))
         return ERROR(memory_allocation);
 
     CHECK_F( ZSTDMT_expandJobsTable(mtctx, nbJobs) );  /* only expands if necessary */
@@ -1280,9 +1304,7 @@ size_t ZSTDMT_initCStream_internal(
     if (params.nbWorkers != mtctx->params.nbWorkers)
         CHECK_F( ZSTDMT_resize(mtctx, params.nbWorkers) );
 
-    if (params.jobSize == 0) {
-        params.jobSize = 1U << ZSTDMT_computeTargetJobLog(params);
-    }
+    if (params.jobSize > 0 && params.jobSize < ZSTDMT_JOBSIZE_MIN) params.jobSize = ZSTDMT_JOBSIZE_MIN;
     if (params.jobSize > ZSTDMT_JOBSIZE_MAX) params.jobSize = ZSTDMT_JOBSIZE_MAX;
 
     mtctx->singleBlockingThread = (pledgedSrcSize <= ZSTDMT_JOBSIZE_MIN);  /* do not trigger multi-threading when srcSize is too small */
@@ -1321,7 +1343,9 @@ size_t ZSTDMT_initCStream_internal(
     mtctx->targetPrefixSize = (size_t)1 << ZSTDMT_computeOverlapLog(params);
     DEBUGLOG(4, "overlapLog=%u => %u KB", params.overlapSizeLog, (U32)(mtctx->targetPrefixSize>>10));
     mtctx->targetSectionSize = params.jobSize;
-    if (mtctx->targetSectionSize < ZSTDMT_JOBSIZE_MIN) mtctx->targetSectionSize = ZSTDMT_JOBSIZE_MIN;
+    if (mtctx->targetSectionSize == 0) {
+        mtctx->targetSectionSize = 1ULL << ZSTDMT_computeTargetJobLog(params);
+    }
     if (mtctx->targetSectionSize < mtctx->targetPrefixSize) mtctx->targetSectionSize = mtctx->targetPrefixSize;  /* job size must be >= overlap size */
     DEBUGLOG(4, "Job Size : %u KB (note : set to %u)", (U32)(mtctx->targetSectionSize>>10), params.jobSize);
     DEBUGLOG(4, "inBuff Size : %u KB", (U32)(mtctx->targetSectionSize>>10));
@@ -1363,7 +1387,7 @@ size_t ZSTDMT_initCStream_internal(
     mtctx->allJobsCompleted = 0;
     mtctx->consumed = 0;
     mtctx->produced = 0;
-    if (ZSTDMT_serialState_reset(&mtctx->serial, mtctx->seqPool, params))
+    if (ZSTDMT_serialState_reset(&mtctx->serial, mtctx->seqPool, params, mtctx->targetSectionSize))
         return ERROR(memory_allocation);
     return 0;
 }
index 4249a82dea872eb3ce3a92f84e783e888e52e5b2..34a475a42bffc0ed216b1f33e947270930375916 100644 (file)
@@ -95,6 +95,11 @@ typedef enum {
  * @return : 0, or an error code (which can be tested using ZSTD_isError()) */
 ZSTDLIB_API size_t ZSTDMT_setMTCtxParameter(ZSTDMT_CCtx* mtctx, ZSTDMT_parameter parameter, unsigned value);
 
+/* ZSTDMT_getMTCtxParameter() :
+ * Query the ZSTDMT_CCtx for a parameter value.
+ * @return : 0, or an error code (which can be tested using ZSTD_isError()) */
+ZSTDLIB_API size_t ZSTDMT_getMTCtxParameter(ZSTDMT_CCtx* mtctx, ZSTDMT_parameter parameter, unsigned* value);
+
 
 /*! ZSTDMT_compressStream_generic() :
  *  Combines ZSTDMT_compressStream() with optional ZSTDMT_flushStream() or ZSTDMT_endStream()
index 70da297289ff182ce1a89930a85915a65f212336..22c49cb3526b627afc3add92f6deafc1a68390f7 100644 (file)
@@ -844,7 +844,12 @@ static int basicUnitTests(U32 seed, double compressibility)
     /* Basic multithreading compression test */
     DISPLAYLEVEL(3, "test%3i : compress %u bytes with multiple threads : ", testNb++, COMPRESSIBLE_NOISE_LENGTH);
     {   ZSTD_parameters const params = ZSTD_getParams(1, 0, 0);
+        unsigned jobSize;
+        CHECK_Z( ZSTDMT_getMTCtxParameter(mtctx, ZSTDMT_p_jobSize, &jobSize));
+        CHECK(jobSize != 0, "job size non-zero");
         CHECK_Z( ZSTDMT_initCStream_advanced(mtctx, CNBuffer, dictSize, params, CNBufferSize) );
+        CHECK_Z( ZSTDMT_getMTCtxParameter(mtctx, ZSTDMT_p_jobSize, &jobSize));
+        CHECK(jobSize != 0, "job size non-zero");
     }
     outBuff.dst = compressedBuffer;
     outBuff.size = compressedBufferSize;