From: Yann Collet Date: Sat, 1 Feb 2025 08:41:36 +0000 (-0800) Subject: better MT fluidity X-Git-Tag: v1.5.7^2~9^2~1 X-Git-Url: http://git.ipfire.org/cgi-bin/gitweb.cgi?a=commitdiff_plain;h=c7cd7dc04bede050475da32fa019c2d0712ed6cf;p=thirdparty%2Fzstd.git better MT fluidity --patch-from no longer blocked on first job dictionary loading --- diff --git a/lib/compress/zstd_compress.c b/lib/compress/zstd_compress.c index d7585c884..c560c902a 100644 --- a/lib/compress/zstd_compress.c +++ b/lib/compress/zstd_compress.c @@ -4900,7 +4900,7 @@ size_t ZSTD_compressBlock(ZSTD_CCtx* cctx, void* dst, size_t dstCapacity, const /*! ZSTD_loadDictionaryContent() : * @return : 0, or an error code */ -static size_t +static size_t ZSTD_loadDictionaryContent(ZSTD_MatchState_t* ms, ldmState_t* ls, ZSTD_cwksp* ws, @@ -5603,7 +5603,7 @@ static size_t ZSTD_initCDict_internal( return 0; } -static ZSTD_CDict* +static ZSTD_CDict* ZSTD_createCDict_advanced_internal(size_t dictSize, ZSTD_dictLoadMethod_e dictLoadMethod, ZSTD_compressionParameters cParams, diff --git a/lib/compress/zstdmt_compress.c b/lib/compress/zstdmt_compress.c index 1840ad855..8725b9071 100644 --- a/lib/compress/zstdmt_compress.c +++ b/lib/compress/zstdmt_compress.c @@ -577,9 +577,10 @@ static void ZSTDMT_serialState_free(SerialState* serialState) ZSTD_customFree(serialState->ldmState.bucketOffsets, cMem); } -static void ZSTDMT_serialState_update(SerialState* serialState, - ZSTD_CCtx* jobCCtx, RawSeqStore_t seqStore, - Range src, unsigned jobID) +static void +ZSTDMT_serialState_genSequences(SerialState* serialState, + RawSeqStore_t* seqStore, + Range src, unsigned jobID) { /* Wait for our turn */ ZSTD_PTHREAD_MUTEX_LOCK(&serialState->mutex); @@ -592,12 +593,13 @@ static void ZSTDMT_serialState_update(SerialState* serialState, /* It is now our turn, do any processing necessary */ if (serialState->params.ldmParams.enableLdm == ZSTD_ps_enable) { size_t error; - assert(seqStore.seq != NULL && seqStore.pos == 0 && - seqStore.size == 0 && seqStore.capacity > 0); + DEBUGLOG(6, "ZSTDMT_serialState_genSequences: LDM update"); + assert(seqStore->seq != NULL && seqStore->pos == 0 && + seqStore->size == 0 && seqStore->capacity > 0); assert(src.size <= serialState->params.jobSize); ZSTD_window_update(&serialState->ldmState.window, src.start, src.size, /* forceNonContiguous */ 0); error = ZSTD_ldm_generateSequences( - &serialState->ldmState, &seqStore, + &serialState->ldmState, seqStore, &serialState->params.ldmParams, src.start, src.size); /* We provide a large enough buffer to never fail. */ assert(!ZSTD_isError(error)); (void)error; @@ -616,10 +618,18 @@ static void ZSTDMT_serialState_update(SerialState* serialState, serialState->nextJobID++; ZSTD_pthread_cond_broadcast(&serialState->cond); ZSTD_pthread_mutex_unlock(&serialState->mutex); +} - if (seqStore.size > 0) { - ZSTD_referenceExternalSequences(jobCCtx, seqStore.seq, seqStore.size); - assert(serialState->params.ldmParams.enableLdm == ZSTD_ps_enable); +static void +ZSTDMT_serialState_applySequences(const SerialState* serialState, /* just for an assert() check */ + ZSTD_CCtx* jobCCtx, + const RawSeqStore_t* seqStore) +{ + if (seqStore->size > 0) { + DEBUGLOG(5, "ZSTDMT_serialState_applySequences: uploading %u external sequences", (unsigned)seqStore->size); + assert(serialState->params.ldmParams.enableLdm == ZSTD_ps_enable); (void)serialState; + assert(jobCCtx); + ZSTD_referenceExternalSequences(jobCCtx, seqStore->seq, seqStore->size); } } @@ -689,6 +699,7 @@ static void ZSTDMT_compressionJob(void* jobDescription) Buffer dstBuff = job->dstBuff; size_t lastCBlockSize = 0; + DEBUGLOG(5, "ZSTDMT_compressionJob: job %u", job->jobID); /* resources */ if (cctx==NULL) JOB_ERROR(ERROR(memory_allocation)); if (dstBuff.start == NULL) { /* streaming job : doesn't provide a dstBuffer */ @@ -710,6 +721,10 @@ static void ZSTDMT_compressionJob(void* jobDescription) /* init */ + + /* Perform serial step as early as possible */ + ZSTDMT_serialState_genSequences(job->serial, &rawSeqStore, job->src, job->jobID); + if (job->cdict) { size_t const initError = ZSTD_compressBegin_advanced_internal(cctx, NULL, 0, ZSTD_dct_auto, ZSTD_dtlm_fast, job->cdict, &jobParams, job->fullFrameSize); assert(job->firstJob); /* only allowed for first job */ @@ -723,16 +738,17 @@ static void ZSTDMT_compressionJob(void* jobDescription) size_t const err = ZSTD_CCtxParams_setParameter(&jobParams, ZSTD_c_deterministicRefPrefix, 0); if (ZSTD_isError(err)) JOB_ERROR(err); } + DEBUGLOG(6, "ZSTDMT_compressionJob: job %u: loading prefix of size %zu", job->jobID, job->prefix.size); { size_t const initError = ZSTD_compressBegin_advanced_internal(cctx, - job->prefix.start, job->prefix.size, ZSTD_dct_rawContent, /* load dictionary in "content-only" mode (no header analysis) */ + job->prefix.start, job->prefix.size, ZSTD_dct_rawContent, ZSTD_dtlm_fast, NULL, /*cdict*/ &jobParams, pledgedSrcSize); if (ZSTD_isError(initError)) JOB_ERROR(initError); } } - /* Perform serial step as early as possible, but after CCtx initialization */ - ZSTDMT_serialState_update(job->serial, cctx, rawSeqStore, job->src, job->jobID); + /* External Sequences can only be applied after CCtx initialization */ + ZSTDMT_serialState_applySequences(job->serial, cctx, &rawSeqStore); if (!job->firstJob) { /* flush and overwrite frame header when it's not first job */ size_t const hSize = ZSTD_compressContinue_public(cctx, dstBuff.start, dstBuff.capacity, job->src.start, 0);