]> git.ipfire.org Git - thirdparty/zstd.git/commitdiff
main thread is awaken when a job position is freed
authorYann Collet <cyan@fb.com>
Sun, 2 Feb 2025 03:42:23 +0000 (19:42 -0800)
committerYann Collet <cyan@fb.com>
Sun, 2 Feb 2025 03:42:23 +0000 (19:42 -0800)
resulting in less "wasted" idle time

lib/common/pool.c
lib/common/pool.h
lib/compress/zstdmt_compress.c

index 3adcefc9a50aedbbaacef5fa43277ddb6109b435..39f38dbeae85c9a32f4860027e02c7a22b78c716 100644 (file)
@@ -57,6 +57,11 @@ struct POOL_ctx_s {
     ZSTD_pthread_cond_t queuePopCond;
     /* Indicates if the queue is shutting down */
     int shutdown;
+
+    /* external mutex for the external condition */
+    ZSTD_pthread_mutex_t* extMutex;
+    /* external condition variable to set when a job is completed */
+    ZSTD_pthread_cond_t* extCond;
 };
 
 /* POOL_thread() :
@@ -89,7 +94,15 @@ static void* POOL_thread(void* opaque) {
             ctx->queueEmpty = (ctx->queueHead == ctx->queueTail);
             /* Unlock the mutex, signal a pusher, and run the job */
             ZSTD_pthread_cond_signal(&ctx->queuePushCond);
-            ZSTD_pthread_mutex_unlock(&ctx->queueMutex);
+            if (ctx->extMutex != NULL) {
+                assert(ctx->extCond != NULL);
+                ZSTD_pthread_mutex_unlock(&ctx->queueMutex);
+                ZSTD_pthread_mutex_lock(ctx->extMutex);
+                ZSTD_pthread_cond_signal(ctx->extCond);
+                ZSTD_pthread_mutex_unlock(ctx->extMutex);
+            } else {
+                ZSTD_pthread_mutex_unlock(&ctx->queueMutex);
+            }
 
             job.function(job.opaque);
 
@@ -97,7 +110,15 @@ static void* POOL_thread(void* opaque) {
             ZSTD_pthread_mutex_lock(&ctx->queueMutex);
             ctx->numThreadsBusy--;
             ZSTD_pthread_cond_signal(&ctx->queuePushCond);
-            ZSTD_pthread_mutex_unlock(&ctx->queueMutex);
+            if (ctx->extMutex != NULL) {
+                assert(ctx->extCond != NULL);
+                ZSTD_pthread_mutex_unlock(&ctx->queueMutex);
+                ZSTD_pthread_mutex_lock(ctx->extMutex);
+                ZSTD_pthread_cond_signal(ctx->extCond);
+                ZSTD_pthread_mutex_unlock(ctx->extMutex);
+            } else {
+                ZSTD_pthread_mutex_unlock(&ctx->queueMutex);
+            }
         }
     }  /* for (;;) */
     assert(0);  /* Unreachable */
@@ -138,6 +159,8 @@ POOL_ctx* POOL_create_advanced(size_t numThreads, size_t queueSize,
         error |= ZSTD_pthread_cond_init(&ctx->queuePopCond, NULL);
         if (error) { POOL_free(ctx); return NULL; }
     }
+    ctx->extMutex = NULL;
+    ctx->extCond = NULL;
     ctx->shutdown = 0;
     /* Allocate space for the thread handles */
     ctx->threads = (ZSTD_pthread_t*)ZSTD_customCalloc(numThreads * sizeof(ZSTD_pthread_t), customMem);
@@ -210,6 +233,14 @@ size_t POOL_sizeof(const POOL_ctx* ctx) {
         + ctx->threadCapacity * sizeof(ZSTD_pthread_t);
 }
 
+void POOL_setExtCond(POOL_ctx* ctx, ZSTD_pthread_mutex_t* mutex, ZSTD_pthread_cond_t* cond)
+{
+    ZSTD_pthread_mutex_lock(&ctx->queueMutex);
+    ctx->extMutex = mutex;
+    ctx->extCond = cond;
+    ZSTD_pthread_mutex_unlock(&ctx->queueMutex);
+    return;
+}
 
 /* @return : 0 on success, 1 on error */
 static int POOL_resize_internal(POOL_ctx* ctx, size_t numThreads)
@@ -309,6 +340,14 @@ int POOL_tryAdd(POOL_ctx* ctx, POOL_function function, void* opaque)
     return 1;
 }
 
+int POOL_canAcceptJob(POOL_ctx* ctx)
+{
+    int r;
+    ZSTD_pthread_mutex_lock(&ctx->queueMutex);
+    r = !isQueueFull(ctx);
+    ZSTD_pthread_mutex_unlock(&ctx->queueMutex);
+    return r;
+}
 
 #else  /* ZSTD_MULTITHREAD  not defined */
 
@@ -368,4 +407,10 @@ size_t POOL_sizeof(const POOL_ctx* ctx) {
     return sizeof(*ctx);
 }
 
+void POOL_setExtCond(POOL_ctx* ctx, ZSTD_pthread_mutex_t* mutex, ZSTD_pthread_cond_t* cond)
+{
+    (void)ctx; (void)mutex; (void)cond;
+    return;
+}
+
 #endif  /* ZSTD_MULTITHREAD */
index f39b7f1eb99e7f90485746df68275acfb93c959e..7c06cacfb6fc90dd2b89f5846f74e3b7e4ad1906 100644 (file)
@@ -15,6 +15,7 @@
 #include "zstd_deps.h"
 #define ZSTD_STATIC_LINKING_ONLY   /* ZSTD_customMem */
 #include "../zstd.h"
+#include "threading.h" /* ZSTD_pthread_mutex_t, ZSTD_pthread_cond_t */
 
 typedef struct POOL_ctx_s POOL_ctx;
 
@@ -35,11 +36,6 @@ POOL_ctx* POOL_create_advanced(size_t numThreads, size_t queueSize,
 void POOL_free(POOL_ctx* ctx);
 
 
-/*! POOL_joinJobs() :
- *  Waits for all queued jobs to finish executing.
- */
-void POOL_joinJobs(POOL_ctx* ctx);
-
 /*! POOL_resize() :
  *  Expands or shrinks pool's number of threads.
  *  This is more efficient than releasing + creating a new context,
@@ -57,6 +53,13 @@ int POOL_resize(POOL_ctx* ctx, size_t numThreads);
  */
 size_t POOL_sizeof(const POOL_ctx* ctx);
 
+
+/*! POOL_sizeof() :
+ * Pass a condition (and its associated mutex) to set whenever a job slot get freed.
+ * Note: can pass NULL to disable currently set condition.
+ */
+void POOL_setExtCond(POOL_ctx* ctx, ZSTD_pthread_mutex_t* mutex, ZSTD_pthread_cond_t* cond);
+
 /*! POOL_function :
  *  The function type that can be added to a thread pool.
  */
@@ -70,7 +73,6 @@ typedef void (*POOL_function)(void*);
  */
 void POOL_add(POOL_ctx* ctx, POOL_function function, void* opaque);
 
-
 /*! POOL_tryAdd() :
  *  Add the job `function(opaque)` to thread pool _if_ a queue slot is available.
  *  Returns immediately even if not (does not block).
@@ -78,4 +80,15 @@ void POOL_add(POOL_ctx* ctx, POOL_function function, void* opaque);
  */
 int POOL_tryAdd(POOL_ctx* ctx, POOL_function function, void* opaque);
 
+/*! POOL_canAcceptJob() :
+ *  Tells if will be able to accept a new job without blocking.
+ * @return : 1 if true, 0 if not (queue full)
+ */
+int POOL_canAcceptJob(POOL_ctx* ctx);
+
+/*! POOL_joinJobs() :
+ *  Waits for all queued jobs to finish executing.
+ */
+void POOL_joinJobs(POOL_ctx* ctx);
+
 #endif
index 4af236fd00750e0e69e7a50fd3f2e0ff5cd341e9..91f158084f78cd833f8091340c3e16e21da012e3 100644 (file)
@@ -1019,6 +1019,7 @@ ZSTDMT_createCCtx_advanced_internal(unsigned nbWorkers, ZSTD_customMem cMem, ZST
         ZSTDMT_freeCCtx(mtctx);
         return NULL;
     }
+    POOL_setExtCond(mtctx->factory, &mtctx->flushMutex, &mtctx->flushCond);
     DEBUGLOG(3, "mt_cctx created, for %u threads", nbWorkers);
     return mtctx;
 }
@@ -1350,10 +1351,12 @@ size_t ZSTDMT_initCStream_internal(
          * for the overlap (if > 0), then one to fill which doesn't overlap
          * with the LDM window.
          */
-        size_t const nbSlackBuffers = 2 + (mtctx->targetPrefixSize > 0);
+        size_t const nbWorkers = MAX((size_t)mtctx->params.nbWorkers, 1);
+        size_t const minSlackBuffers = 2 + (mtctx->targetPrefixSize > 0);
+        size_t const extraSlackBuffers = MAX(1, nbWorkers/4);  /* for fluidity, when jobs are completed out of order */
+        size_t const nbSlackBuffers = minSlackBuffers + extraSlackBuffers;
         size_t const slackSize = mtctx->targetJobSize * nbSlackBuffers;
         /* Compute the total size, and always have enough slack */
-        size_t const nbWorkers = MAX(mtctx->params.nbWorkers, 1);
         size_t const sectionsSize = mtctx->targetJobSize * nbWorkers;
         size_t const capacity = MAX(windowSize, sectionsSize) + slackSize;
         if (mtctx->roundBuff.capacity < capacity) {
@@ -1431,17 +1434,6 @@ static void ZSTDMT_writeLastEmptyBlock(ZSTDMT_jobDescription* job)
     assert(job->consumed == 0);
 }
 
-/* @returns 1 if there is anything ready to flush */
-static int ZSTDMT_anythingToFlush(const ZSTDMT_CCtx* mtctx)
-{
-    unsigned const wJobID = mtctx->doneJobID & mtctx->jobIDMask;
-    int r = 0;
-    ZSTD_PTHREAD_MUTEX_LOCK(&mtctx->jobs[wJobID].job_mutex);
-    r = mtctx->jobs[wJobID].dstFlushed < mtctx->jobs[wJobID].cSize;
-    ZSTD_pthread_mutex_unlock(&mtctx->jobs[wJobID].job_mutex);
-    return r;
-}
-
 static size_t ZSTDMT_createCompressionJob(ZSTDMT_CCtx* mtctx, size_t srcSize, ZSTD_EndDirective endOp)
 {
     unsigned const jobID = mtctx->nextJobID & mtctx->jobIDMask;
@@ -1513,23 +1505,15 @@ static size_t ZSTDMT_createCompressionJob(ZSTDMT_CCtx* mtctx, size_t srcSize, ZS
                 mtctx->nextJobID,
                 jobID);
 
-    if (1 || ZSTDMT_anythingToFlush(mtctx)) {
-        if (POOL_tryAdd(mtctx->factory, ZSTDMT_compressionJob, &mtctx->jobs[jobID])) {
-            mtctx->nextJobID++;
-            mtctx->jobReady = 0;
-            return 1;
-        } else {
-            DEBUGLOG(5, "ZSTDMT_createCompressionJob: no worker currently available for job %u", mtctx->nextJobID);
-            mtctx->jobReady = 1;
-            return 0;
-        }
-    } else {
-        /* block here, wait for next available job */
-        POOL_add(mtctx->factory, ZSTDMT_compressionJob, &mtctx->jobs[jobID]);
+    if (POOL_tryAdd(mtctx->factory, ZSTDMT_compressionJob, &mtctx->jobs[jobID])) {
         mtctx->nextJobID++;
         mtctx->jobReady = 0;
+        return 1;
     }
-    return 1;
+
+    DEBUGLOG(5, "ZSTDMT_createCompressionJob: no worker currently available for job %u", mtctx->nextJobID);
+    mtctx->jobReady = 1;
+    return 0;
 }
 
 
@@ -1556,17 +1540,20 @@ static size_t ZSTDMT_flushProduced(ZSTDMT_CCtx* mtctx, ZSTD_outBuffer* output, u
                             mtctx->doneJobID, (U32)mtctx->jobs[wJobID].consumed, (U32)mtctx->jobs[wJobID].src.size);
                 break;
             }
-            DEBUGLOG(5, "waiting for something to flush from job %u (currently flushed: %u bytes)",
-                        mtctx->doneJobID, (U32)mtctx->jobs[wJobID].dstFlushed);
+            DEBUGLOG(5, "waiting for something to flush from job %u (%u input left)",
+                        mtctx->doneJobID, (unsigned)(mtctx->jobs[wJobID].src.size - mtctx->jobs[wJobID].consumed));
             if (mtctx->jobs[wJobID].flush_mutex == NULL) {
                 mtctx->jobs[wJobID].flush_mutex = &mtctx->flushMutex;
                 mtctx->jobs[wJobID].flush_cond = &mtctx->flushCond;
             }
-            DEBUGLOG(6, "waiting to flush something (%zu left)",  mtctx->jobs[wJobID].src.size - mtctx->jobs[wJobID].consumed);
             ZSTD_pthread_mutex_unlock(&mtctx->jobs[wJobID].job_mutex);
+            /* note: if a job was completed between POOL_tryAdd() and this waiting condition,
+             * the signal, which was already issued, will be lost.
+             * It just reduces an opportunity to start a new job immediately */
             ZSTD_PTHREAD_COND_WAIT(&mtctx->flushCond, &mtctx->flushMutex);  /* block waiting for something to flush */
             ZSTD_PTHREAD_MUTEX_LOCK(&mtctx->jobs[wJobID].job_mutex);
-            DEBUGLOG(6, "condition triggered: let's flush something (%zu bytes)", mtctx->jobs[wJobID].cSize - mtctx->jobs[wJobID].dstFlushed);
+            DEBUGLOG(6, "flushCond triggered: let's flush something (%zu bytes)", mtctx->jobs[wJobID].cSize - mtctx->jobs[wJobID].dstFlushed);
+            break; /* can be triggered with nothing to flush, when a job was just completed */
     }   }
 
     /* try to flush something */