From: Yann Collet Date: Sat, 1 Feb 2025 09:53:03 +0000 (-0800) Subject: better job fluidity in MT when one job get stuck X-Git-Url: http://git.ipfire.org/cgi-bin/gitweb.cgi?a=commitdiff_plain;h=5bda54abfb2fcf6c7e7592d33f0a2d238b197cdd;p=thirdparty%2Fzstd.git better job fluidity in MT when one job get stuck notably when first job takes too long to load its prefix --- diff --git a/lib/compress/zstd_compress.c b/lib/compress/zstd_compress.c index dadd126b9..52c5a9865 100644 --- a/lib/compress/zstd_compress.c +++ b/lib/compress/zstd_compress.c @@ -4962,7 +4962,7 @@ ZSTD_loadDictionaryContent(ZSTD_MatchState_t* ms, } /* If the dict is larger than we can reasonably index in our tables, only load the suffix. */ - { U32 maxDictSize = 1U << MIN(MAX(params->cParams.hashLog + 3, params->cParams.chainLog + 1), 31); + { U32 const maxDictSize = 1U << MIN(MAX(params->cParams.hashLog + 3, params->cParams.chainLog + 1), 31); if (srcSize > maxDictSize) { ip = iend - maxDictSize; src = ip; diff --git a/lib/compress/zstdmt_compress.c b/lib/compress/zstdmt_compress.c index 8725b9071..3b572e4b0 100644 --- a/lib/compress/zstdmt_compress.c +++ b/lib/compress/zstdmt_compress.c @@ -907,7 +907,8 @@ static void ZSTDMT_freeJobsTable(ZSTDMT_jobDescription* jobTable, U32 nbJobs, ZS * update *nbJobsPtr to next power of 2 value, as size of table */ static ZSTDMT_jobDescription* ZSTDMT_createJobsTable(U32* nbJobsPtr, ZSTD_customMem cMem) { - U32 const nbJobsLog2 = ZSTD_highbit32(*nbJobsPtr) + 1; + U32 const margin = MAX(4, *nbJobsPtr / 2); + U32 const nbJobsLog2 = ZSTD_highbit32(*nbJobsPtr + margin) + 1; U32 const nbJobs = 1 << nbJobsLog2; U32 jobNb; ZSTDMT_jobDescription* const jobTable = (ZSTDMT_jobDescription*) @@ -927,8 +928,9 @@ static ZSTDMT_jobDescription* ZSTDMT_createJobsTable(U32* nbJobsPtr, ZSTD_custom } static size_t ZSTDMT_expandJobsTable (ZSTDMT_CCtx* mtctx, U32 nbWorkers) { - U32 nbJobs = nbWorkers + 2; - if (nbJobs > mtctx->jobIDMask+1) { /* need more job capacity */ + U32 const margin = MAX(4, nbWorkers); + U32 nbJobs = nbWorkers + margin; + if (nbJobs >= mtctx->jobIDMask) { /* need more job capacity */ ZSTDMT_freeJobsTable(mtctx->jobs, mtctx->jobIDMask+1, mtctx->cMem); mtctx->jobIDMask = 0; mtctx->jobs = ZSTDMT_createJobsTable(&nbJobs, mtctx->cMem); @@ -947,7 +949,8 @@ static size_t ZSTDMT_CCtxParam_setNbWorkers(ZSTD_CCtx_params* params, unsigned n return ZSTD_CCtxParams_setParameter(params, ZSTD_c_nbWorkers, (int)nbWorkers); } -MEM_STATIC ZSTDMT_CCtx* ZSTDMT_createCCtx_advanced_internal(unsigned nbWorkers, ZSTD_customMem cMem, ZSTD_threadPool* pool) +MEM_STATIC ZSTDMT_CCtx* +ZSTDMT_createCCtx_advanced_internal(unsigned nbWorkers, ZSTD_customMem cMem, ZSTD_threadPool* pool) { ZSTDMT_CCtx* mtctx; U32 nbJobs = nbWorkers + 2; @@ -1388,6 +1391,17 @@ static void ZSTDMT_writeLastEmptyBlock(ZSTDMT_jobDescription* job) assert(job->consumed == 0); } +/* @returns 1 if there is anything ready to flush */ +static int ZSTDMT_anythingToFlush(const ZSTDMT_CCtx* mtctx) +{ + unsigned const wJobID = mtctx->doneJobID & mtctx->jobIDMask; + int r = 0; + ZSTD_PTHREAD_MUTEX_LOCK(&mtctx->jobs[wJobID].job_mutex); + r = mtctx->jobs[wJobID].dstFlushed < mtctx->jobs[wJobID].cSize; + ZSTD_pthread_mutex_unlock(&mtctx->jobs[wJobID].job_mutex); + return r; +} + static size_t ZSTDMT_createCompressionJob(ZSTDMT_CCtx* mtctx, size_t srcSize, ZSTD_EndDirective endOp) { unsigned const jobID = mtctx->nextJobID & mtctx->jobIDMask; @@ -1456,13 +1470,22 @@ static size_t ZSTDMT_createCompressionJob(ZSTDMT_CCtx* mtctx, size_t srcSize, ZS mtctx->jobs[jobID].lastJob, mtctx->nextJobID, jobID); - if (POOL_tryAdd(mtctx->factory, ZSTDMT_compressionJob, &mtctx->jobs[jobID])) { + + if (ZSTDMT_anythingToFlush(mtctx)) { + if (POOL_tryAdd(mtctx->factory, ZSTDMT_compressionJob, &mtctx->jobs[jobID])) { + mtctx->nextJobID++; + mtctx->jobReady = 0; + } else { + DEBUGLOG(2, "ZSTDMT_createCompressionJob: no worker currently available for job %u", mtctx->nextJobID); + mtctx->jobReady = 1; + } + } else { + /* block here, wait for next available job */ + POOL_add(mtctx->factory, ZSTDMT_compressionJob, &mtctx->jobs[jobID]); mtctx->nextJobID++; mtctx->jobReady = 0; - } else { - DEBUGLOG(5, "ZSTDMT_createCompressionJob: no worker available for job %u", mtctx->nextJobID); - mtctx->jobReady = 1; } + return 0; }