]> git.ipfire.org Git - thirdparty/zstd.git/commitdiff
better job fluidity in MT when one job get stuck
authorYann Collet <cyan@fb.com>
Sat, 1 Feb 2025 09:53:03 +0000 (01:53 -0800)
committerYann Collet <cyan@fb.com>
Sat, 1 Feb 2025 10:48:48 +0000 (02:48 -0800)
notably when first job takes too long to load its prefix

lib/compress/zstd_compress.c
lib/compress/zstdmt_compress.c

index dadd126b9c8444ac4b7613564d8040d31fc88d61..52c5a986549a62debe0116e80fe5334bcd1644de 100644 (file)
@@ -4962,7 +4962,7 @@ ZSTD_loadDictionaryContent(ZSTD_MatchState_t* ms,
     }
 
     /* If the dict is larger than we can reasonably index in our tables, only load the suffix. */
-    {   U32 maxDictSize = 1U << MIN(MAX(params->cParams.hashLog + 3, params->cParams.chainLog + 1), 31);
+    {   U32 const maxDictSize = 1U << MIN(MAX(params->cParams.hashLog + 3, params->cParams.chainLog + 1), 31);
         if (srcSize > maxDictSize) {
             ip = iend - maxDictSize;
             src = ip;
index 8725b90716f92978bdc506ec6cffd04aa23d03f2..3b572e4b00875d97b8f79a58c0a1317341ad4873 100644 (file)
@@ -907,7 +907,8 @@ static void ZSTDMT_freeJobsTable(ZSTDMT_jobDescription* jobTable, U32 nbJobs, ZS
  * update *nbJobsPtr to next power of 2 value, as size of table */
 static ZSTDMT_jobDescription* ZSTDMT_createJobsTable(U32* nbJobsPtr, ZSTD_customMem cMem)
 {
-    U32 const nbJobsLog2 = ZSTD_highbit32(*nbJobsPtr) + 1;
+    U32 const margin = MAX(4, *nbJobsPtr / 2);
+    U32 const nbJobsLog2 = ZSTD_highbit32(*nbJobsPtr + margin) + 1;
     U32 const nbJobs = 1 << nbJobsLog2;
     U32 jobNb;
     ZSTDMT_jobDescription* const jobTable = (ZSTDMT_jobDescription*)
@@ -927,8 +928,9 @@ static ZSTDMT_jobDescription* ZSTDMT_createJobsTable(U32* nbJobsPtr, ZSTD_custom
 }
 
 static size_t ZSTDMT_expandJobsTable (ZSTDMT_CCtx* mtctx, U32 nbWorkers) {
-    U32 nbJobs = nbWorkers + 2;
-    if (nbJobs > mtctx->jobIDMask+1) {  /* need more job capacity */
+    U32 const margin = MAX(4, nbWorkers);
+    U32 nbJobs = nbWorkers + margin;
+    if (nbJobs >= mtctx->jobIDMask) {  /* need more job capacity */
         ZSTDMT_freeJobsTable(mtctx->jobs, mtctx->jobIDMask+1, mtctx->cMem);
         mtctx->jobIDMask = 0;
         mtctx->jobs = ZSTDMT_createJobsTable(&nbJobs, mtctx->cMem);
@@ -947,7 +949,8 @@ static size_t ZSTDMT_CCtxParam_setNbWorkers(ZSTD_CCtx_params* params, unsigned n
     return ZSTD_CCtxParams_setParameter(params, ZSTD_c_nbWorkers, (int)nbWorkers);
 }
 
-MEM_STATIC ZSTDMT_CCtx* ZSTDMT_createCCtx_advanced_internal(unsigned nbWorkers, ZSTD_customMem cMem, ZSTD_threadPool* pool)
+MEM_STATIC ZSTDMT_CCtx*
+ZSTDMT_createCCtx_advanced_internal(unsigned nbWorkers, ZSTD_customMem cMem, ZSTD_threadPool* pool)
 {
     ZSTDMT_CCtx* mtctx;
     U32 nbJobs = nbWorkers + 2;
@@ -1388,6 +1391,17 @@ static void ZSTDMT_writeLastEmptyBlock(ZSTDMT_jobDescription* job)
     assert(job->consumed == 0);
 }
 
+/* @returns 1 if there is anything ready to flush */
+static int ZSTDMT_anythingToFlush(const ZSTDMT_CCtx* mtctx)
+{
+    unsigned const wJobID = mtctx->doneJobID & mtctx->jobIDMask;
+    int r = 0;
+    ZSTD_PTHREAD_MUTEX_LOCK(&mtctx->jobs[wJobID].job_mutex);
+    r = mtctx->jobs[wJobID].dstFlushed < mtctx->jobs[wJobID].cSize;
+    ZSTD_pthread_mutex_unlock(&mtctx->jobs[wJobID].job_mutex);
+    return r;
+}
+
 static size_t ZSTDMT_createCompressionJob(ZSTDMT_CCtx* mtctx, size_t srcSize, ZSTD_EndDirective endOp)
 {
     unsigned const jobID = mtctx->nextJobID & mtctx->jobIDMask;
@@ -1456,13 +1470,22 @@ static size_t ZSTDMT_createCompressionJob(ZSTDMT_CCtx* mtctx, size_t srcSize, ZS
                 mtctx->jobs[jobID].lastJob,
                 mtctx->nextJobID,
                 jobID);
-    if (POOL_tryAdd(mtctx->factory, ZSTDMT_compressionJob, &mtctx->jobs[jobID])) {
+
+    if (ZSTDMT_anythingToFlush(mtctx)) {
+        if (POOL_tryAdd(mtctx->factory, ZSTDMT_compressionJob, &mtctx->jobs[jobID])) {
+            mtctx->nextJobID++;
+            mtctx->jobReady = 0;
+        } else {
+            DEBUGLOG(2, "ZSTDMT_createCompressionJob: no worker currently available for job %u", mtctx->nextJobID);
+            mtctx->jobReady = 1;
+        }
+    } else {
+        /* block here, wait for next available job */
+        POOL_add(mtctx->factory, ZSTDMT_compressionJob, &mtctx->jobs[jobID]);
         mtctx->nextJobID++;
         mtctx->jobReady = 0;
-    } else {
-        DEBUGLOG(5, "ZSTDMT_createCompressionJob: no worker available for job %u", mtctx->nextJobID);
-        mtctx->jobReady = 1;
     }
+
     return 0;
 }