]> git.ipfire.org Git - thirdparty/zstd.git/commitdiff
better MT fluidity
authorYann Collet <cyan@fb.com>
Sat, 1 Feb 2025 08:41:36 +0000 (00:41 -0800)
committerYann Collet <cyan@fb.com>
Thu, 6 Feb 2025 02:42:00 +0000 (18:42 -0800)
--patch-from no longer blocked on first job dictionary loading

lib/compress/zstd_compress.c
lib/compress/zstdmt_compress.c

index d7585c884a33768af274663f8c26327279ef110f..c560c902a545fa65a4cb23283131b0a58aec7488 100644 (file)
@@ -4900,7 +4900,7 @@ size_t ZSTD_compressBlock(ZSTD_CCtx* cctx, void* dst, size_t dstCapacity, const
 /*! ZSTD_loadDictionaryContent() :
  *  @return : 0, or an error code
  */
-static size_t 
+static size_t
 ZSTD_loadDictionaryContent(ZSTD_MatchState_t* ms,
                         ldmState_t* ls,
                         ZSTD_cwksp* ws,
@@ -5603,7 +5603,7 @@ static size_t ZSTD_initCDict_internal(
     return 0;
 }
 
-static ZSTD_CDict* 
+static ZSTD_CDict*
 ZSTD_createCDict_advanced_internal(size_t dictSize,
                                 ZSTD_dictLoadMethod_e dictLoadMethod,
                                 ZSTD_compressionParameters cParams,
index 1840ad8553582aed76b89e2fdc16883f5e3f31bb..8725b90716f92978bdc506ec6cffd04aa23d03f2 100644 (file)
@@ -577,9 +577,10 @@ static void ZSTDMT_serialState_free(SerialState* serialState)
     ZSTD_customFree(serialState->ldmState.bucketOffsets, cMem);
 }
 
-static void ZSTDMT_serialState_update(SerialState* serialState,
-                                      ZSTD_CCtx* jobCCtx, RawSeqStore_t seqStore,
-                                      Range src, unsigned jobID)
+static void
+ZSTDMT_serialState_genSequences(SerialState* serialState,
+                                RawSeqStore_t* seqStore,
+                                Range src, unsigned jobID)
 {
     /* Wait for our turn */
     ZSTD_PTHREAD_MUTEX_LOCK(&serialState->mutex);
@@ -592,12 +593,13 @@ static void ZSTDMT_serialState_update(SerialState* serialState,
         /* It is now our turn, do any processing necessary */
         if (serialState->params.ldmParams.enableLdm == ZSTD_ps_enable) {
             size_t error;
-            assert(seqStore.seq != NULL && seqStore.pos == 0 &&
-                   seqStore.size == 0 && seqStore.capacity > 0);
+            DEBUGLOG(6, "ZSTDMT_serialState_genSequences: LDM update");
+            assert(seqStore->seq != NULL && seqStore->pos == 0 &&
+                   seqStore->size == 0 && seqStore->capacity > 0);
             assert(src.size <= serialState->params.jobSize);
             ZSTD_window_update(&serialState->ldmState.window, src.start, src.size, /* forceNonContiguous */ 0);
             error = ZSTD_ldm_generateSequences(
-                &serialState->ldmState, &seqStore,
+                &serialState->ldmState, seqStore,
                 &serialState->params.ldmParams, src.start, src.size);
             /* We provide a large enough buffer to never fail. */
             assert(!ZSTD_isError(error)); (void)error;
@@ -616,10 +618,18 @@ static void ZSTDMT_serialState_update(SerialState* serialState,
     serialState->nextJobID++;
     ZSTD_pthread_cond_broadcast(&serialState->cond);
     ZSTD_pthread_mutex_unlock(&serialState->mutex);
+}
 
-    if (seqStore.size > 0) {
-        ZSTD_referenceExternalSequences(jobCCtx, seqStore.seq, seqStore.size);
-        assert(serialState->params.ldmParams.enableLdm == ZSTD_ps_enable);
+static void
+ZSTDMT_serialState_applySequences(const SerialState* serialState, /* just for an assert() check */
+                                  ZSTD_CCtx* jobCCtx,
+                                  const RawSeqStore_t* seqStore)
+{
+    if (seqStore->size > 0) {
+        DEBUGLOG(5, "ZSTDMT_serialState_applySequences: uploading %u external sequences", (unsigned)seqStore->size);
+        assert(serialState->params.ldmParams.enableLdm == ZSTD_ps_enable); (void)serialState;
+        assert(jobCCtx);
+        ZSTD_referenceExternalSequences(jobCCtx, seqStore->seq, seqStore->size);
     }
 }
 
@@ -689,6 +699,7 @@ static void ZSTDMT_compressionJob(void* jobDescription)
     Buffer dstBuff = job->dstBuff;
     size_t lastCBlockSize = 0;
 
+    DEBUGLOG(5, "ZSTDMT_compressionJob: job %u", job->jobID);
     /* resources */
     if (cctx==NULL) JOB_ERROR(ERROR(memory_allocation));
     if (dstBuff.start == NULL) {   /* streaming job : doesn't provide a dstBuffer */
@@ -710,6 +721,10 @@ static void ZSTDMT_compressionJob(void* jobDescription)
 
 
     /* init */
+
+    /* Perform serial step as early as possible */
+    ZSTDMT_serialState_genSequences(job->serial, &rawSeqStore, job->src, job->jobID);
+
     if (job->cdict) {
         size_t const initError = ZSTD_compressBegin_advanced_internal(cctx, NULL, 0, ZSTD_dct_auto, ZSTD_dtlm_fast, job->cdict, &jobParams, job->fullFrameSize);
         assert(job->firstJob);  /* only allowed for first job */
@@ -723,16 +738,17 @@ static void ZSTDMT_compressionJob(void* jobDescription)
             size_t const err = ZSTD_CCtxParams_setParameter(&jobParams, ZSTD_c_deterministicRefPrefix, 0);
             if (ZSTD_isError(err)) JOB_ERROR(err);
         }
+        DEBUGLOG(6, "ZSTDMT_compressionJob: job %u: loading prefix of size %zu", job->jobID, job->prefix.size);
         {   size_t const initError = ZSTD_compressBegin_advanced_internal(cctx,
-                                        job->prefix.start, job->prefix.size, ZSTD_dct_rawContent, /* load dictionary in "content-only" mode (no header analysis) */
+                                        job->prefix.start, job->prefix.size, ZSTD_dct_rawContent,
                                         ZSTD_dtlm_fast,
                                         NULL, /*cdict*/
                                         &jobParams, pledgedSrcSize);
             if (ZSTD_isError(initError)) JOB_ERROR(initError);
     }   }
 
-    /* Perform serial step as early as possible, but after CCtx initialization */
-    ZSTDMT_serialState_update(job->serial, cctx, rawSeqStore, job->src, job->jobID);
+    /* External Sequences can only be applied after CCtx initialization */
+    ZSTDMT_serialState_applySequences(job->serial, cctx, &rawSeqStore);
 
     if (!job->firstJob) {  /* flush and overwrite frame header when it's not first job */
         size_t const hSize = ZSTD_compressContinue_public(cctx, dstBuff.start, dstBuff.capacity, job->src.start, 0);